GraphChi  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Macros
stripedio.hpp
Go to the documentation of this file.
1 
31 #ifndef DEF_STRIPEDIO_HPP
32 #define DEF_STRIPEDIO_HPP
33 
34 #include <iostream>
35 
36 #include <fcntl.h>
37 #include <unistd.h>
38 #include <assert.h>
39 #include <stdint.h>
40 #include <pthread.h>
41 #include <errno.h>
42 //#include <omp.h>
43 
44 #include <vector>
45 
46 #include "logger/logger.hpp"
47 #include "metrics/metrics.hpp"
48 #include "util/synchronized_queue.hpp"
49 #include "util/ioutil.hpp"
50 #include "util/cmdopts.hpp"
51 
52 
53 
54 namespace graphchi {
55 
56  static size_t get_filesize(std::string filename);
57  struct pinned_file;
58 
62  struct io_descriptor {
63  std::string filename;
64  std::vector<int> readdescs;
65  std::vector<int> writedescs;
66  pinned_file * pinned_to_memory;
67  int start_mplex;
68  bool open;
69  };
70 
71 
72  enum BLOCK_ACTION { READ, WRITE };
73 
74  // Very simple ref count system
75  struct refcountptr {
76  char * ptr;
77  volatile int count;
78  refcountptr(char * ptr, int count) : ptr(ptr), count(count) {}
79  };
80 
81  // Forward declaration
82  class stripedio;
83 
84  struct iotask {
85  BLOCK_ACTION action;
86  int fd;
87 
88  refcountptr * ptr;
89  size_t length;
90  size_t offset;
91  size_t ptroffset;
92  bool free_after;
93  stripedio * iomgr;
94 
95 
96  iotask() : action(READ), fd(0), ptr(NULL), length(0), offset(0), ptroffset(0), free_after(false), iomgr(NULL) {}
97  iotask(stripedio * iomgr, BLOCK_ACTION act, int fd, refcountptr * ptr, size_t length, size_t offset, size_t ptroffset, bool free_after=false) :
98  action(act), fd(fd), ptr(ptr),length(length), offset(offset), ptroffset(ptroffset), free_after(free_after), iomgr(iomgr) {}
99  };
100 
101  struct thrinfo {
102  synchronized_queue<iotask> * readqueue;
103  synchronized_queue<iotask> * commitqueue;
104  synchronized_queue<iotask> * prioqueue;
105 
106  bool running;
107  metrics * m;
108  volatile int pending_writes;
109  volatile int pending_reads;
110  int mplex;
111  };
112 
113  // Forward declaration
114  static void * io_thread_loop(void * _info);
115 
116  struct stripe_chunk {
117  int mplex_thread;
118  size_t offset;
119  size_t len;
120  stripe_chunk(int mplex_thread, size_t offset, size_t len) : mplex_thread(mplex_thread), offset(offset), len(len) {}
121  };
122 
123 
124  struct streaming_task {
125  stripedio * iomgr;
126  int session;
127  size_t len;
128  volatile size_t curpos;
129  char ** buf;
130  streaming_task() {}
131  streaming_task(stripedio * iomgr, int session, size_t len, char ** buf) : iomgr(iomgr), session(session), len(len), curpos(0), buf(buf) {}
132  };
133 
134  struct pinned_file {
135  std::string filename;
136  size_t length;
137  uint8_t * data;
138  bool touched;
139  };
140 
141  // Forward declaration
142  static void * stream_read_loop(void * _info);
143 
144 
145  class stripedio {
146 
147  std::vector<io_descriptor *> sessions;
148  mutex mlock;
149  int blocksize;
150  int stripesize;
151  int multiplex;
152  std::string multiplex_root;
153  bool disable_preloading;
154 
155  std::vector< synchronized_queue<iotask> > mplex_readtasks;
156  std::vector< synchronized_queue<iotask> > mplex_writetasks;
157  std::vector< synchronized_queue<iotask> > mplex_priotasks;
158  std::vector< pthread_t > threads;
159  std::vector< thrinfo * > thread_infos;
160  metrics &m;
161 
162  /* Memory-pinned files */
163  std::vector<pinned_file *> preloaded_files;
164  mutex preload_lock;
165  size_t preloaded_bytes;
166  size_t max_preload_bytes;
167 
168 
169  int niothreads; // threads per mplex
170 
171  public:
172  stripedio( metrics &_m) : m(_m) {
173  disable_preloading = false;
174  blocksize = get_option_int("io.blocksize", 1024 * 1024);
175  stripesize = get_option_int("io.stripesize", blocksize/2);
176  preloaded_bytes = 0;
177  max_preload_bytes = 1024 * 1024 * get_option_long("preload.max_megabytes", 0);
178 
179  if (max_preload_bytes > 0) {
180  logstream(LOG_INFO) << "Preloading maximum " << max_preload_bytes << " bytes." << std::endl;
181  }
182 
183  multiplex = get_option_int("multiplex", 1);
184  if (multiplex>1) {
185  multiplex_root = get_option_string("multiplex_root", "<not-set>");
186  } else {
187  multiplex_root = "";
188  stripesize = 1024*1024*1024;
189  }
190  m.set("stripesize", (size_t)stripesize);
191 
192  // Start threads (niothreads is now threads per multiplex)
193  niothreads = get_option_int("niothreads", 1);
194  m.set("niothreads", (size_t)niothreads);
195 
196  // Each multiplex partition has its own queues
197  for(int i=0; i<multiplex * niothreads; i++) {
198  mplex_readtasks.push_back(synchronized_queue<iotask>());
199  mplex_writetasks.push_back(synchronized_queue<iotask>());
200  mplex_priotasks.push_back(synchronized_queue<iotask>());
201  }
202 
203  int k = 0;
204  for(int i=0; i < multiplex; i++) {
205  for(int j=0; j < niothreads; j++) {
206  thrinfo * cthreadinfo = new thrinfo();
207  cthreadinfo->commitqueue = &mplex_writetasks[k];
208  cthreadinfo->readqueue = &mplex_readtasks[k];
209  cthreadinfo->prioqueue = &mplex_priotasks[k];
210  cthreadinfo->running = true;
211  cthreadinfo->pending_writes = 0;
212  cthreadinfo->pending_reads = 0;
213  cthreadinfo->mplex = i;
214  cthreadinfo->m = &m;
215  thread_infos.push_back(cthreadinfo);
216 
217  pthread_t iothread;
218  int ret = pthread_create(&iothread, NULL, io_thread_loop, cthreadinfo);
219  threads.push_back(iothread);
220  assert(ret>=0);
221  k++;
222  }
223  }
224  }
225 
226  ~stripedio() {
227  int mplex = (int) thread_infos.size();
228  // Quit all threads
229  for(int i=0; i<mplex; i++) {
230  thread_infos[i]->running=false;
231  }
232  size_t nthreads = threads.size();
233  for(unsigned int i=0; i<nthreads; i++) {
234  pthread_join(threads[i], NULL);
235  }
236  for(int i=0; i<mplex; i++) {
237  delete thread_infos[i];
238  }
239 
240  for(int j=0; j<(int)sessions.size(); j++) {
241  if (sessions[j] != NULL) {
242  delete sessions[j];
243  sessions[j] = NULL;
244  }
245  }
246 
247  for(std::vector<pinned_file *>::iterator it=preloaded_files.begin();
248  it != preloaded_files.end(); ++it) {
249  pinned_file * preloaded = (*it);
250  delete preloaded->data;
251  delete preloaded;
252  }
253  }
254 
255  void set_disable_preloading(bool b) {
256  disable_preloading = b;
257  if (b) logstream(LOG_INFO) << "Disabled preloading." << std::endl;
258  }
259 
260  bool multiplexed() {
261  return multiplex>1;
262  }
263 
264  void print_session(int session) {
265  for(int i=0; i<multiplex; i++) {
266  std::cout << "multiplex: " << multiplex << std::endl;
267  std::cout << "Read desc: " << sessions[session]->readdescs[i] << std::endl;
268  }
269 
270  for(int i=0; i<(int)sessions[session]->writedescs.size(); i++) {
271  std::cout << "multiplex: " << multiplex << std::endl;
272  std::cout << "Read desc: " << sessions[session]->writedescs[i] << std::endl;
273  }
274  }
275 
276  // Compute a hash for filename which is used for
277  // permuting the stripes. It is important the permutation
278  // is same regardless of when the file is opened.
279  int hash(std::string filename) {
280  const char * cstr = filename.c_str();
281  int hash = 1;
282  int l = (int) strlen(cstr);
283  for(int i=0; i<l; i++) {
284  hash = 31*hash + cstr[i];
285  }
286  return std::abs(hash);
287  }
288 
289  int open_session(std::string filename, bool readonly=false) {
290  mlock.lock();
291  // FIXME: known memory leak: sessions table is never shrunk
292  int session_id = (int) sessions.size();
293  io_descriptor * iodesc = new io_descriptor();
294  iodesc->open = true;
295  iodesc->pinned_to_memory = is_preloaded(filename);
296  iodesc->start_mplex = hash(filename) % multiplex;
297  sessions.push_back(iodesc);
298  mlock.unlock();
299 
300  if (NULL != iodesc->pinned_to_memory) {
301  logstream(LOG_INFO) << "Opened preloaded session: " << filename << std::endl;
302  return session_id;
303  }
304 
305  for(int i=0; i<multiplex; i++) {
306  std::string fname = multiplexprefix(i) + filename;
307  for(int j=0; j<niothreads+(multiplex == 1 ? 1 : 0); j++) { // Hack to have one fd for synchronous
308  int rddesc = open(fname.c_str(), O_RDONLY);
309  if (rddesc < 0) logstream(LOG_ERROR) << "Could not open: " << fname << " session: " << session_id
310  << " error: " << strerror(errno) << std::endl;
311  assert(rddesc>=0);
312  iodesc->readdescs.push_back(rddesc);
313 #ifdef F_NOCACHE
314  if (!readonly)
315  fcntl(rddesc, F_NOCACHE, 1);
316 #endif
317  if (!readonly) {
318  int wrdesc = open(fname.c_str(), O_RDWR);
319 
320  if (wrdesc < 0) logstream(LOG_ERROR) << "Could not open for writing: " << fname << " session: " << session_id
321  << " error: " << strerror(errno) << std::endl;
322  assert(wrdesc>=0);
323 #ifdef F_NOCACHE
324  fcntl(wrdesc, F_NOCACHE, 1);
325 
326 #endif
327  iodesc->writedescs.push_back(wrdesc);
328  }
329  }
330  }
331  iodesc->filename = filename;
332  if (iodesc->writedescs.size() > 0) {
333  logstream(LOG_INFO) << "Opened write-session: " << session_id << "(" << iodesc->writedescs[0] << ") for " << filename << std::endl;
334  } else {
335  logstream(LOG_INFO) << "Opened read-session: " << session_id << "(" << iodesc->readdescs[0] << ") for " << filename << std::endl;
336 
337  }
338  return session_id;
339  }
340 
341  void close_session(int session) {
342  mlock.lock();
343  // Note: currently io-descriptors are left into the vertex array
344  // in purpose to make managed memory work. Should be fixed as this is
345  // a (relatively minor) memory leak.
346  bool wasopen;
347  io_descriptor * iodesc = sessions[session];
348  wasopen = iodesc->open;
349  iodesc->open = false;
350  mlock.unlock();
351  if (wasopen) {
352  for(std::vector<int>::iterator it=iodesc->readdescs.begin(); it!=iodesc->readdescs.end(); ++it) {
353  close(*it);
354  }
355  for(std::vector<int>::iterator it=iodesc->writedescs.begin(); it!=iodesc->writedescs.end(); ++it) {
356  close(*it);
357  }
358  }
359  }
360 
361  int mplex_for_offset(int session, size_t off) {
362  return ((int) (off / stripesize) + sessions[session]->start_mplex) % multiplex;
363  }
364 
365  // Returns vector of <mplex, offset>
366  std::vector< stripe_chunk > stripe_offsets(int session, size_t nbytes, size_t off) {
367  size_t end = off+nbytes;
368  size_t idx = off;
369  size_t bufoff = 0;
370  std::vector<stripe_chunk> stripelist;
371  while(idx<end) {
372  size_t blockoff = idx%stripesize;
373  size_t blocklen = std::min(stripesize-blockoff, end-idx);
374 
375  int mplex_thread = (int) mplex_for_offset(session, idx) * niothreads + (int) (random() % niothreads);
376  stripelist.push_back(stripe_chunk(mplex_thread, bufoff, blocklen));
377 
378  bufoff += blocklen;
379  idx += blocklen;
380  }
381  return stripelist;
382  }
383 
384  template <typename T>
385  void preada_async(int session, T * tbuf, size_t nbytes, size_t off) {
386  std::vector<stripe_chunk> stripelist = stripe_offsets(session, nbytes, off);
387  refcountptr * refptr = new refcountptr((char*)tbuf, (int)stripelist.size());
388  for(int i=0; i<(int)stripelist.size(); i++) {
389  stripe_chunk chunk = stripelist[i];
390  __sync_add_and_fetch(&thread_infos[chunk.mplex_thread]->pending_reads, 1);
391  mplex_readtasks[chunk.mplex_thread].push(iotask(this, READ, sessions[session]->readdescs[chunk.mplex_thread],
392  refptr, chunk.len, chunk.offset+off, chunk.offset));
393  }
394  }
395 
396  /* Used for pipelined read */
397  void launch_stream_reader(streaming_task * task) {
398  pthread_t t;
399  int ret = pthread_create(&t, NULL, stream_read_loop, (void*)task);
400  assert(ret>=0);
401  }
402 
403 
408  bool pinned_session(int session) {
409  return sessions[session]->pinned_to_memory;
410  }
411 
417  void allow_preloading(std::string filename) {
418  if (disable_preloading) {
419  return;
420  }
421  preload_lock.lock();
422  size_t filesize = get_filesize(filename);
423  if (preloaded_bytes + filesize <= max_preload_bytes) {
424  preloaded_bytes += filesize;
425  m.set("preload_bytes", preloaded_bytes);
426 
427  pinned_file * pfile = new pinned_file();
428  pfile->filename = filename;
429  pfile->length = filesize;
430  pfile->data = (uint8_t*) malloc(filesize);
431  pfile->touched = false;
432  assert(pfile->data != NULL);
433 
434  int fid = open(filename.c_str(), O_RDONLY);
435  if (fid < 0) {
436  logstream(LOG_ERROR) << "Could not read file: " << filename
437  << " error: " << strerror(errno) << std::endl;
438  }
439  assert(fid >= 0);
440  /* Preload the file */
441  logstream(LOG_INFO) << "Preloading: " << filename << std::endl;
442  preada(fid, pfile->data, filesize, 0);
443  close(fid);
444  preloaded_files.push_back(pfile);
445  }
446  preload_lock.unlock();
447  }
448 
449  void commit_preloaded() {
450  for(std::vector<pinned_file *>::iterator it=preloaded_files.begin();
451  it != preloaded_files.end(); ++it) {
452  pinned_file * preloaded = (*it);
453  if (preloaded->touched) {
454  logstream(LOG_INFO) << "Commit preloaded file: " << preloaded->filename << std::endl;
455  int fid = open(preloaded->filename.c_str(), O_WRONLY);
456  if (fid < 0) {
457  logstream(LOG_ERROR) << "Could not read file: " << preloaded->filename
458  << " error: " << strerror(errno) << std::endl;
459  continue;
460  }
461  pwritea(fid, preloaded->data, preloaded->length, 0);
462  close(fid);
463  }
464  preloaded->touched = false;
465  }
466  }
467 
468  pinned_file * is_preloaded(std::string filename) {
469  preload_lock.lock();
470  pinned_file * preloaded = NULL;
471  for(std::vector<pinned_file *>::iterator it=preloaded_files.begin();
472  it != preloaded_files.end(); ++it) {
473  if (filename == (*it)->filename) {
474  preloaded = *it;
475  break;
476  }
477  }
478  preload_lock.unlock();
479  return preloaded;
480  }
481 
482 
483  // Note: data is freed after write!
484  template <typename T>
485  void pwritea_async(int session, T * tbuf, size_t nbytes, size_t off, bool free_after) {
486  std::vector<stripe_chunk> stripelist = stripe_offsets(session, nbytes, off);
487  refcountptr * refptr = new refcountptr((char*)tbuf, (int) stripelist.size());
488  for(int i=0; i<(int)stripelist.size(); i++) {
489  stripe_chunk chunk = stripelist[i];
490  __sync_add_and_fetch(&thread_infos[chunk.mplex_thread]->pending_writes, 1);
491  mplex_writetasks[chunk.mplex_thread].push(iotask(this, WRITE, sessions[session]->writedescs[chunk.mplex_thread],
492  refptr, chunk.len, chunk.offset+off, chunk.offset, free_after));
493  }
494  }
495 
496  template <typename T>
497  void preada_now(int session, T * tbuf, size_t nbytes, size_t off) {
498  metrics_entry me = m.start_time();
499 
500  if (multiplex > 1) {
501  std::vector<stripe_chunk> stripelist = stripe_offsets(session, nbytes, off);
502  size_t checklen=0;
503  refcountptr * refptr = new refcountptr((char*)tbuf, (int) stripelist.size());
504  refptr->count++; // Take a reference so we can spin on it
505  for(int i=0; i < (int)stripelist.size(); i++) {
506  stripe_chunk chunk = stripelist[i];
507  __sync_add_and_fetch(&thread_infos[chunk.mplex_thread]->pending_reads, 1);
508 
509  // Use prioritized task queue
510  mplex_priotasks[chunk.mplex_thread].push(iotask(this, READ, sessions[session]->readdescs[chunk.mplex_thread],
511  refptr, chunk.len, chunk.offset+off, chunk.offset));
512  checklen += chunk.len;
513  }
514  assert(checklen == nbytes);
515 
516  // Spin
517  while(refptr->count>1) {
518  usleep(5000);
519  }
520  delete refptr;
521  } else {
522  preada(sessions[session]->readdescs[threads.size()], tbuf, nbytes, off);
523  }
524  m.stop_time(me, "preada_now", false);
525  }
526 
527  template <typename T>
528  void pwritea_now(int session, T * tbuf, size_t nbytes, size_t off) {
529  metrics_entry me = m.start_time();
530  std::vector<stripe_chunk> stripelist = stripe_offsets(session, nbytes, off);
531  size_t checklen=0;
532 
533  for(int i=0; i<(int)stripelist.size(); i++) {
534  stripe_chunk chunk = stripelist[i];
535  pwritea(sessions[session]->writedescs[chunk.mplex_thread], (char*)tbuf+chunk.offset, chunk.len, chunk.offset+off);
536  checklen += chunk.len;
537  }
538  assert(checklen == nbytes);
539  m.stop_time(me, "pwritea_now", false);
540 
541  }
542 
543 
544 
549  template <typename T>
550  void managed_pwritea_async(int session, T ** tbuf, size_t nbytes, size_t off, bool free_after) {
551  if (!pinned_session(session)) {
552  pwritea_async(session, *tbuf, nbytes, off, free_after);
553  } else {
554  // Do nothing but mark the descriptor as 'dirty'
555  sessions[session]->pinned_to_memory->touched = true;
556  }
557  }
558 
559  template <typename T>
560  void managed_preada_now(int session, T ** tbuf, size_t nbytes, size_t off) {
561  if (!pinned_session(session)) {
562  preada_now(session, *tbuf, nbytes, off);
563  } else {
564  io_descriptor * iodesc = sessions[session];
565  *tbuf = (T*) (iodesc->pinned_to_memory->data + off);
566  }
567  }
568 
569  template <typename T>
570  void managed_pwritea_now(int session, T ** tbuf, size_t nbytes, size_t off) {
571  if (!pinned_session(session)) {
572  pwritea_now(session, *tbuf, nbytes, off);
573  } else {
574  // Do nothing but mark the descriptor as 'dirty'
575  sessions[session]->pinned_to_memory->touched = true;
576  }
577  }
578 
579  template<typename T>
580  void managed_malloc(int session, T ** tbuf, size_t nbytes, size_t noff) {
581  if (!pinned_session(session)) {
582  *tbuf = (T*) malloc(nbytes);
583  } else {
584  io_descriptor * iodesc = sessions[session];
585  *tbuf = (T*) (iodesc->pinned_to_memory->data + noff);
586  }
587  }
588 
589  template <typename T>
590  void managed_preada_async(int session, T ** tbuf, size_t nbytes, size_t off) {
591  if (!pinned_session(session)) {
592  preada_async(session, *tbuf, nbytes, off);
593  } else {
594  io_descriptor * iodesc = sessions[session];
595  *tbuf = (T*) (iodesc->pinned_to_memory->data + off);
596  }
597  }
598 
599  template <typename T>
600  void managed_release(int session, T ** ptr) {
601  if (!pinned_session(session)) {
602  assert(*ptr != NULL);
603  free(*ptr);
604  }
605  *ptr = NULL;
606  }
607 
608 
609  void truncate(int session, size_t nbytes) {
610  assert(!pinned_session(session));
611  assert(multiplex <= 1); // We do not support truncating on multiplex yet
612  int stat = ftruncate(sessions[session]->writedescs[0], nbytes);
613  if (stat != 0) {
614  logstream(LOG_ERROR) << "Could not truncate " << sessions[session]->filename <<
615  " error: " << strerror(errno) << std::endl;
616  assert(false);
617  }
618  }
619 
620  void wait_for_reads() {
621  metrics_entry me = m.start_time();
622  int loops = 0;
623  int mplex = (int) thread_infos.size();
624  for(int i=0; i<mplex; i++) {
625  while(thread_infos[i]->pending_reads > 0) {
626  usleep(10000);
627  loops++;
628  }
629  }
630  m.stop_time(me, "stripedio_wait_for_reads", false);
631  }
632 
633  void wait_for_writes() {
634  metrics_entry me = m.start_time();
635  int mplex = (int) thread_infos.size();
636  for(int i=0; i<mplex; i++) {
637  while(thread_infos[i]->pending_writes>0) {
638  usleep(10000);
639  }
640  }
641  m.stop_time(me, "stripedio_wait_for_writes", false);
642  }
643 
644 
645  std::string multiplexprefix(int stripe) {
646  if (multiplex > 1) {
647  char mstr[255];
648  sprintf(mstr, "%d/", 1+stripe%multiplex);
649  return multiplex_root + std::string(mstr);
650  } else return "";
651  }
652 
653  std::string multiplexprefix_random() {
654  return multiplexprefix((int)random() % multiplex);
655  }
656  };
657 
658 
659  static void * io_thread_loop(void * _info) {
660  iotask task;
661  thrinfo * info = (thrinfo*)_info;
662  logstream(LOG_INFO) << "Thread for multiplex :" << info->mplex << " starting." << std::endl;
663  while(info->running) {
664  bool success;
665  if (info->pending_reads>0) { // Prioritize read queue
666  success = info->prioqueue->safepop(&task);
667  if (!success) {
668  success = info->readqueue->safepop(&task);
669  }
670  } else {
671  success = info->commitqueue->safepop(&task);
672  }
673  if (success) {
674  if (task.action == WRITE) { // Write
675  metrics_entry me = info->m->start_time();
676 
677  pwritea(task.fd, task.ptr->ptr + task.ptroffset, task.length, task.offset);
678  if (task.free_after) {
679  // Threead-safe method of memory managment - ugly!
680  if (__sync_sub_and_fetch(&task.ptr->count, 1) == 0) {
681  free(task.ptr->ptr);
682  free(task.ptr);
683  }
684  }
685  __sync_sub_and_fetch(&info->pending_writes, 1);
686  info->m->stop_time(me, "commit_thr");
687  } else {
688  preada(task.fd, task.ptr->ptr+task.ptroffset, task.length, task.offset);
689  __sync_sub_and_fetch(&info->pending_reads, 1);
690  if (__sync_sub_and_fetch(&task.ptr->count, 1) == 0) {
691  free(task.ptr);
692  }
693  }
694  } else {
695  usleep(50000); // 50 ms
696  }
697  }
698  return NULL;
699  }
700 
701 
702  static void * stream_read_loop(void * _info) {
703  streaming_task * task = (streaming_task*)_info;
704  timeval start, end;
705  gettimeofday(&start, NULL);
706  size_t bufsize = 32*1024*1024; // 32 megs
707  char * tbuf;
708 
714  if (task->iomgr->pinned_session(task->session)) {
715  __sync_add_and_fetch(&task->curpos, task->len);
716  return NULL;
717  }
718  tbuf = *task->buf;
719  while(task->curpos < task->len) {
720  size_t toread = std::min((size_t)task->len - (size_t)task->curpos, (size_t)bufsize);
721  task->iomgr->preada_now(task->session, tbuf + task->curpos, toread, task->curpos);
722  __sync_add_and_fetch(&task->curpos, toread);
723  }
724 
725  gettimeofday(&end, NULL);
726 
727  return NULL;
728  }
729 
730 
731  static size_t get_filesize(std::string filename) {
732  std::string fname = filename;
733  int f = open(fname.c_str(), O_RDONLY);
734 
735  if (f < 0) {
736  logstream(LOG_ERROR) << "Could not open file " << filename << " error: " << strerror(errno) << std::endl;
737  assert(false);
738  }
739 
740  off_t sz = lseek(f, 0, SEEK_END);
741  close(f);
742  return sz;
743  }
744 
745 
746 
747 }
748 
749 
750 #endif
751 
752