IOR
aiori-CEPHFS.c
Go to the documentation of this file.
1 /* -*- mode: c; c-basic-offset: 8; indent-tabs-mode: nil; -*-
2  * vim:expandtab:shiftwidth=8:tabstop=8:
3  */
4 /******************************************************************************\
5 * *
6 * (C) 2015 The University of Chicago *
7 * (C) 2020 Red Hat, Inc. *
8 * *
9 * See COPYRIGHT in top-level directory. *
10 * *
11 ********************************************************************************
12 *
13 * Implement abstract I/O interface for CEPHFS.
14 *
15 \******************************************************************************/
16 
17 #ifdef HAVE_CONFIG_H
18 # include "config.h"
19 #endif
20 
21 #include <stdio.h>
22 #include <stdlib.h>
23 #include <sys/stat.h>
24 #include <cephfs/libcephfs.h>
25 
26 #include "ior.h"
27 #include "iordef.h"
28 #include "aiori.h"
29 #include "utilities.h"
30 
31 #define CEPH_O_RDONLY 00000000
32 #define CEPH_O_WRONLY 00000001
33 #define CEPH_O_RDWR 00000002
34 #define CEPH_O_CREAT 00000100
35 #define CEPH_O_EXCL 00000200
36 #define CEPH_O_TRUNC 00001000
37 #define CEPH_O_LAZY 00020000
38 #define CEPH_O_DIRECTORY 00200000
39 #define CEPH_O_NOFOLLOW 00400000
40 
41 /************************** O P T I O N S *****************************/
43  char * user;
44  char * conf;
45  char * prefix;
46 };
47 
48 static struct cephfs_options o = {
49  .user = NULL,
50  .conf = NULL,
51  .prefix = NULL,
52 };
53 
54 static option_help options [] = {
55  {0, "cephfs.user", "Username for the ceph cluster", OPTION_REQUIRED_ARGUMENT, 's', & o.user},
56  {0, "cephfs.conf", "Config file for the ceph cluster", OPTION_REQUIRED_ARGUMENT, 's', & o.conf},
57  {0, "cephfs.prefix", "mount prefix", OPTION_OPTIONAL_ARGUMENT, 's', & o.prefix},
59 };
60 
61 static struct ceph_mount_info *cmount;
62 
63 /**************************** P R O T O T Y P E S *****************************/
64 static void CEPHFS_Init();
65 static void CEPHFS_Final();
66 static void *CEPHFS_Create(char *, IOR_param_t *);
67 static void *CEPHFS_Open(char *, IOR_param_t *);
68 static IOR_offset_t CEPHFS_Xfer(int, void *, IOR_size_t *,
70 static void CEPHFS_Close(void *, IOR_param_t *);
71 static void CEPHFS_Delete(char *, IOR_param_t *);
72 static void CEPHFS_Fsync(void *, IOR_param_t *);
73 static IOR_offset_t CEPHFS_GetFileSize(IOR_param_t *, MPI_Comm, char *);
74 static int CEPHFS_StatFS(const char *, ior_aiori_statfs_t *, IOR_param_t *);
75 static int CEPHFS_MkDir(const char *, mode_t, IOR_param_t *);
76 static int CEPHFS_RmDir(const char *, IOR_param_t *);
77 static int CEPHFS_Access(const char *, int, IOR_param_t *);
78 static int CEPHFS_Stat(const char *, struct stat *, IOR_param_t *);
79 static void CEPHFS_Sync(IOR_param_t *);
80 static option_help * CEPHFS_options();
81 
82 /************************** D E C L A R A T I O N S ***************************/
84  .name = "CEPHFS",
85  .name_legacy = NULL,
86  .initialize = CEPHFS_Init,
87  .finalize = CEPHFS_Final,
88  .create = CEPHFS_Create,
89  .open = CEPHFS_Open,
90  .xfer = CEPHFS_Xfer,
91  .close = CEPHFS_Close,
92  .delete = CEPHFS_Delete,
93  .get_version = aiori_get_version,
94  .fsync = CEPHFS_Fsync,
95  .get_file_size = CEPHFS_GetFileSize,
96  .statfs = CEPHFS_StatFS,
97  .mkdir = CEPHFS_MkDir,
98  .rmdir = CEPHFS_RmDir,
99  .access = CEPHFS_Access,
100  .stat = CEPHFS_Stat,
101  .sync = CEPHFS_Sync,
102  .get_options = CEPHFS_options,
103 };
104 
105 #define CEPHFS_ERR(__err_str, __ret) do { \
106  errno = -__ret; \
107  ERR(__err_str); \
108 } while(0)
109 
110 /***************************** F U N C T I O N S ******************************/
111 static const char* pfix(const char* path) {
112  const char* npath = path;
113  const char* prefix = o.prefix;
114  while (*prefix) {
115  if(*prefix++ != *npath++) {
116  return path;
117  }
118  }
119  return npath;
120 }
121 
123  return options;
124 }
125 
126 static void CEPHFS_Init()
127 {
128  /* Short circuit if the options haven't been filled yet. */
129  if (!o.user || !o.conf || !o.prefix) {
130  WARN("CEPHFS_Init() called before options have been populated!");
131  return;
132  }
133 
134  /* Short circuit if the mount handle already exists */
135  if (cmount) {
136  return;
137  }
138 
139  int ret;
140  /* create CEPHFS mount handle */
141  ret = ceph_create(&cmount, o.user);
142  if (ret) {
143  CEPHFS_ERR("unable to create CEPHFS mount handle", ret);
144  }
145 
146  /* set the handle using the Ceph config */
147  ret = ceph_conf_read_file(cmount, o.conf);
148  if (ret) {
149  CEPHFS_ERR("unable to read ceph config file", ret);
150  }
151 
152  /* mount the handle */
153  ret = ceph_mount(cmount, "/");
154  if (ret) {
155  CEPHFS_ERR("unable to mount cephfs", ret);
156  ceph_shutdown(cmount);
157 
158  }
159 
160  Inode *root;
161 
162  /* try retrieving the root cephfs inode */
163  ret = ceph_ll_lookup_root(cmount, &root);
164  if (ret) {
165  CEPHFS_ERR("uanble to retrieve root cephfs inode", ret);
166  ceph_shutdown(cmount);
167 
168  }
169 
170  return;
171 }
172 
173 static void CEPHFS_Final()
174 {
175  /* shutdown */
176  int ret = ceph_unmount(cmount);
177  if (ret < 0) {
178  CEPHFS_ERR("ceph_umount failed", ret);
179  }
180  ret = ceph_release(cmount);
181  if (ret < 0) {
182  CEPHFS_ERR("ceph_release failed", ret);
183  }
184  cmount = NULL;
185 }
186 
187 static void *CEPHFS_Create(char *testFileName, IOR_param_t * param)
188 {
189  return CEPHFS_Open(testFileName, param);
190 }
191 
192 static void *CEPHFS_Open(char *testFileName, IOR_param_t * param)
193 {
194  const char *file = pfix(testFileName);
195  int* fd;
196  fd = (int *)malloc(sizeof(int));
197 
198  mode_t mode = 0664;
199  int flags = (int) 0;
200 
201  /* set IOR file flags to CephFS flags */
202  /* -- file open flags -- */
203  if (param->openFlags & IOR_RDONLY) {
204  flags |= CEPH_O_RDONLY;
205  }
206  if (param->openFlags & IOR_WRONLY) {
207  flags |= CEPH_O_WRONLY;
208  }
209  if (param->openFlags & IOR_RDWR) {
210  flags |= CEPH_O_RDWR;
211  }
212  if (param->openFlags & IOR_APPEND) {
213  fprintf(stdout, "File append not implemented in CephFS\n");
214  }
215  if (param->openFlags & IOR_CREAT) {
216  flags |= CEPH_O_CREAT;
217  }
218  if (param->openFlags & IOR_EXCL) {
219  flags |= CEPH_O_EXCL;
220  }
221  if (param->openFlags & IOR_TRUNC) {
222  flags |= CEPH_O_TRUNC;
223  }
224  if (param->openFlags & IOR_DIRECT) {
225  fprintf(stdout, "O_DIRECT not implemented in CephFS\n");
226  }
227  *fd = ceph_open(cmount, file, flags, mode);
228  if (*fd < 0) {
229  CEPHFS_ERR("ceph_open failed", *fd);
230  }
231  return (void *) fd;
232 }
233 
234 static IOR_offset_t CEPHFS_Xfer(int access, void *file, IOR_size_t * buffer,
235  IOR_offset_t length, IOR_param_t * param)
236 {
237  uint64_t size = (uint64_t) length;
238  char *buf = (char *) buffer;
239  int fd = *(int *) file;
240  int ret;
241 
242  if (access == WRITE)
243  {
244  ret = ceph_write(cmount, fd, buf, size, param->offset);
245  if (ret < 0) {
246  CEPHFS_ERR("unable to write file to CephFS", ret);
247  } else if (ret < size) {
248  CEPHFS_ERR("short write to CephFS", ret);
249  }
250  if (param->fsyncPerWrite == TRUE) {
251  CEPHFS_Fsync(&fd, param);
252  }
253  }
254  else /* READ */
255  {
256  ret = ceph_read(cmount, fd, buf, size, param->offset);
257  if (ret < 0) {
258  CEPHFS_ERR("unable to read file from CephFS", ret);
259  } else if (ret < size) {
260  CEPHFS_ERR("short read from CephFS", ret);
261  }
262 
263  }
264  return length;
265 }
266 
267 static void CEPHFS_Fsync(void *file, IOR_param_t * param)
268 {
269  int fd = *(int *) file;
270  int ret = ceph_fsync(cmount, fd, 0);
271  if (ret < 0) {
272  CEPHFS_ERR("ceph_fsync failed", ret);
273  }
274 }
275 
276 static void CEPHFS_Close(void *file, IOR_param_t * param)
277 {
278  int fd = *(int *) file;
279  int ret = ceph_close(cmount, fd);
280  if (ret < 0) {
281  CEPHFS_ERR("ceph_close failed", ret);
282  }
283  free(file);
284  return;
285 }
286 
287 static void CEPHFS_Delete(char *testFileName, IOR_param_t * param)
288 {
289  int ret = ceph_unlink(cmount, pfix(testFileName));
290  if (ret < 0) {
291  CEPHFS_ERR("ceph_unlink failed", ret);
292  }
293  return;
294 }
295 
297  char *testFileName)
298 {
299  struct stat stat_buf;
300  IOR_offset_t aggFileSizeFromStat, tmpMin, tmpMax, tmpSum;
301 
302  int ret = ceph_stat(cmount, pfix(testFileName), &stat_buf);
303  if (ret < 0) {
304  CEPHFS_ERR("ceph_stat failed", ret);
305  }
306  aggFileSizeFromStat = stat_buf.st_size;
307 
308  if (param->filePerProc == TRUE) {
309  MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat, &tmpSum, 1,
310  MPI_LONG_LONG_INT, MPI_SUM, testComm),
311  "cannot total data moved");
312  aggFileSizeFromStat = tmpSum;
313  } else {
314  MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat, &tmpMin, 1,
315  MPI_LONG_LONG_INT, MPI_MIN, testComm),
316  "cannot total data moved");
317  MPI_CHECK(MPI_Allreduce(&aggFileSizeFromStat, &tmpMax, 1,
318  MPI_LONG_LONG_INT, MPI_MAX, testComm),
319  "cannot total data moved");
320  if (tmpMin != tmpMax) {
321  if (rank == 0) {
322  WARN("inconsistent file size by different tasks");
323  }
324  /* incorrect, but now consistent across tasks */
325  aggFileSizeFromStat = tmpMin;
326  }
327  }
328 
329  return (aggFileSizeFromStat);
330 
331 }
332 
333 static int CEPHFS_StatFS(const char *path, ior_aiori_statfs_t *stat_buf,
334  IOR_param_t *param)
335 {
336 #if defined(HAVE_STATVFS)
337  struct statvfs statfs_buf;
338  int ret = ceph_statfs(cmount, pfix(path), &statfs_buf);
339  if (ret < 0) {
340  CEPHFS_ERR("ceph_statfs failed", ret);
341  return -1;
342  }
343 
344  stat_buf->f_bsize = statfs_buf.f_bsize;
345  stat_buf->f_blocks = statfs_buf.f_blocks;
346  stat_buf->f_bfree = statfs_buf.f_bfree;
347  stat_buf->f_files = statfs_buf.f_files;
348  stat_buf->f_ffree = statfs_buf.f_ffree;
349 
350  return 0;
351 #else
352  WARN("ceph_statfs requires statvfs!");
353  return -1;
354 #endif
355 }
356 
357 static int CEPHFS_MkDir(const char *path, mode_t mode, IOR_param_t *param)
358 {
359  return ceph_mkdir(cmount, pfix(path), mode);
360 }
361 
362 static int CEPHFS_RmDir(const char *path, IOR_param_t *param)
363 {
364  return ceph_rmdir(cmount, pfix(path));
365 }
366 
367 static int CEPHFS_Access(const char *testFileName, int mode, IOR_param_t *param)
368 {
369  struct stat buf;
370  return ceph_stat(cmount, pfix(testFileName), &buf);
371 }
372 
373 static int CEPHFS_Stat(const char *testFileName, struct stat *buf, IOR_param_t *param)
374 {
375  return ceph_stat(cmount, pfix(testFileName), buf);
376 }
377 
378 static void CEPHFS_Sync(IOR_param_t *param)
379 {
380  int ret = ceph_sync_fs(cmount);
381  if (ret < 0) {
382  CEPHFS_ERR("ceph_sync_fs failed", ret);
383  }
384 
385 }
#define CEPH_O_TRUNC
Definition: aiori-CEPHFS.c:36
uint64_t f_blocks
Definition: aiori.h:53
uint64_t f_bfree
Definition: aiori.h:54
#define LAST_OPTION
Definition: option.h:39
static struct ceph_mount_info * cmount
Definition: aiori-CEPHFS.c:61
int filePerProc
Definition: ior.h:125
#define CEPH_O_WRONLY
Definition: aiori-CEPHFS.c:32
static int CEPHFS_StatFS(const char *, ior_aiori_statfs_t *, IOR_param_t *)
Definition: aiori-CEPHFS.c:333
static void CEPHFS_Sync(IOR_param_t *)
Definition: aiori-CEPHFS.c:378
static int CEPHFS_Stat(const char *, struct stat *, IOR_param_t *)
Definition: aiori-CEPHFS.c:373
ior_aiori_t cephfs_aiori
Definition: aiori-CEPHFS.c:83
static IOR_offset_t CEPHFS_Xfer(int, void *, IOR_size_t *, IOR_offset_t, IOR_param_t *)
Definition: aiori-CEPHFS.c:234
uint64_t f_ffree
Definition: aiori.h:57
#define IOR_APPEND
Definition: aiori.h:31
#define CEPHFS_ERR(__err_str, __ret)
Definition: aiori-CEPHFS.c:105
#define IOR_RDONLY
Definition: aiori.h:28
int fsyncPerWrite
Definition: ior.h:170
#define MPI_CHECK(MPI_STATUS, MSG)
Definition: aiori-debug.h:127
#define WRITE
Definition: iordef.h:86
static void CEPHFS_Init()
Definition: aiori-CEPHFS.c:126
static void CEPHFS_Fsync(void *, IOR_param_t *)
Definition: aiori-CEPHFS.c:267
static int CEPHFS_RmDir(const char *, IOR_param_t *)
Definition: aiori-CEPHFS.c:362
#define IOR_CREAT
Definition: aiori.h:32
static int CEPHFS_Access(const char *, int, IOR_param_t *)
Definition: aiori-CEPHFS.c:367
#define IOR_EXCL
Definition: aiori.h:34
char * aiori_get_version()
Definition: aiori.c:232
static void CEPHFS_Final()
Definition: aiori-CEPHFS.c:173
uint64_t f_files
Definition: aiori.h:56
MPI_Comm testComm
Definition: utilities.c:71
static option_help options[]
Definition: aiori-CEPHFS.c:54
#define IOR_TRUNC
Definition: aiori.h:33
uint64_t f_bsize
Definition: aiori.h:52
#define WARN(MSG)
Definition: aiori-debug.h:32
static void * CEPHFS_Create(char *, IOR_param_t *)
Definition: aiori-CEPHFS.c:187
#define CEPH_O_RDWR
Definition: aiori-CEPHFS.c:33
static void CEPHFS_Delete(char *, IOR_param_t *)
Definition: aiori-CEPHFS.c:287
static IOR_offset_t CEPHFS_GetFileSize(IOR_param_t *, MPI_Comm, char *)
Definition: aiori-CEPHFS.c:296
#define IOR_WRONLY
Definition: aiori.h:29
static void CEPHFS_Close(void *, IOR_param_t *)
Definition: aiori-CEPHFS.c:276
#define CEPH_O_RDONLY
Definition: aiori-CEPHFS.c:31
long long int IOR_size_t
Definition: iordef.h:110
#define CEPH_O_CREAT
Definition: aiori-CEPHFS.c:34
static option_help * CEPHFS_options()
Definition: aiori-CEPHFS.c:122
static const char * pfix(const char *path)
Definition: aiori-CEPHFS.c:111
#define IOR_RDWR
Definition: aiori.h:30
static int CEPHFS_MkDir(const char *, mode_t, IOR_param_t *)
Definition: aiori-CEPHFS.c:357
char * name
Definition: aiori.h:88
#define CEPH_O_EXCL
Definition: aiori-CEPHFS.c:35
long long int IOR_offset_t
Definition: iordef.h:109
int rank
Definition: utilities.c:68
#define TRUE
Definition: iordef.h:66
#define IOR_DIRECT
Definition: aiori.h:35
static struct cephfs_options o
Definition: aiori-CEPHFS.c:48
#define NULL
Definition: iordef.h:70
static void * CEPHFS_Open(char *, IOR_param_t *)
Definition: aiori-CEPHFS.c:192