GraphChi  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Macros
conversions.hpp
Go to the documentation of this file.
1 
28 #ifndef GRAPHCHI_CONVERSIONS_DEF
29 #define GRAPHCHI_CONVERSIONS_DEF
30 
31 #include <fcntl.h>
32 #include <unistd.h>
33 
34 #include "graphchi_types.hpp"
35 #include "logger/logger.hpp"
37 
42 #ifdef __GNUC__
43 #define VARIABLE_IS_NOT_USED __attribute__ ((unused))
44 #else
45 #define VARIABLE_IS_NOT_USED
46 #endif
47 
48 namespace graphchi {
49 
50  /* Simple string to number parsers */
51  static void VARIABLE_IS_NOT_USED parse(int &x, const char * s);
52  static void VARIABLE_IS_NOT_USED parse(unsigned int &x, const char * s);
53  static void VARIABLE_IS_NOT_USED parse(float &x, const char * s);
54  static void VARIABLE_IS_NOT_USED parse(long &x, const char * s);
55  static void VARIABLE_IS_NOT_USED parse(char &x, const char * s);
56  static void VARIABLE_IS_NOT_USED parse(bool &x, const char * s);
57  static void VARIABLE_IS_NOT_USED parse(double &x, const char * s);
58  static void VARIABLE_IS_NOT_USED parse(short &x, const char * s);
59  static void FIXLINE(char * s);
60 
61  static void parse(int &x, const char * s) {
62  x = atoi(s);
63  }
64 
65  static void parse(unsigned int &x, const char * s) {
66  x = (unsigned int) strtoul(s, NULL, 10);
67  }
68 
69  static void parse(float &x, const char * s) {
70  x = (float) atof(s);
71  }
72 
73 
77  template <typename T>
78  void parse(PairContainer<T> &x, const char * s) {
79  parse(x.left, s);
80  parse(x.right, s);
81  }
82 
83  static void parse(long &x, const char * s) {
84  x = atol(s);
85  }
86 
87  static void parse(char &x, const char * s) {
88  x = s[0];
89  }
90 
91  static void parse(bool &x, const char * s) {
92  x = atoi(s) == 1;
93  }
94 
95  static void parse(double &x, const char * s) {
96  x = atof(s);
97  }
98 
99  static void parse(short &x, const char * s) {
100  x = (short) atoi(s);
101  }
102 
103  // Catch all
104  template <typename T>
105  void parse(T &x, const char * s) {
106  logstream(LOG_FATAL) << "You need to define parse<your-type>(your-type &x, const char *s) function"
107  << " to support parsing the edge value." << std::endl;
108  assert(false);
109  }
110 
111 
112 
113  // Removes \n from the end of line
114  void FIXLINE(char * s) {
115  int len = (int) strlen(s)-1;
116  if(s[len] == '\n') s[len] = 0;
117  }
118 
119 
124  template <typename EdgeDataType>
125  void convert_edgelist(std::string inputfile, sharder<EdgeDataType> &sharderobj) {
126  FILE * inf = fopen(inputfile.c_str(), "r");
127 
128  if (inf == NULL) {
129  logstream(LOG_FATAL) << "Could not load :" << inputfile << " error: " << strerror(errno) << std::endl;
130  }
131  assert(inf != NULL);
132 
133  logstream(LOG_INFO) << "Reading in edge list format!" << std::endl;
134  char s[1024];
135  while(fgets(s, 1024, inf) != NULL) {
136  FIXLINE(s);
137  if (s[0] == '#') continue; // Comment
138  if (s[0] == '%') continue; // Comment
139 
140  char delims[] = "\t ";
141  char * t;
142  t = strtok(s, delims);
143  vid_t from = atoi(t);
144  t = strtok(NULL, delims);
145  vid_t to = atoi(t);
146 
147  /* Check if has value */
148  t = strtok(NULL, delims);
149  EdgeDataType val;
150  if (t != NULL) {
151  parse(val, (const char*) t);
152  } else {
153  val = EdgeDataType();
154  }
155  if (from != to) {
156  sharderobj.preprocessing_add_edge(from, to, val);
157  }
158  }
159  fclose(inf);
160  }
161 
166  template <typename EdgeDataType>
167  void convert_adjlist(std::string inputfile, sharder<EdgeDataType> &sharderobj) {
168  FILE * inf = fopen(inputfile.c_str(), "r");
169  if (inf == NULL) {
170  logstream(LOG_FATAL) << "Could not load :" << inputfile << " error: " << strerror(errno) << std::endl;
171  }
172  assert(inf != NULL);
173  logstream(LOG_INFO) << "Reading in adjacency list format!" << std::endl;
174 
175  int maxlen = 100000000;
176  char * s = (char*) malloc(maxlen);
177 
178  char delims[] = " \t";
179  size_t linenum = 0;
180  /*** PHASE 1 - count ***/
181  while(fgets(s, maxlen, inf) != NULL) {
182  linenum++;
183  FIXLINE(s);
184  if (s[0] == '#') continue; // Comment
185  if (s[0] == '%') continue; // Comment
186  char * t = strtok(s, delims);
187  vid_t from = atoi(t);
188  t = strtok(NULL,delims);
189  if (t != NULL) {
190  vid_t num = atoi(t);
191  vid_t i = 0;
192  while((t = strtok(NULL,delims)) != NULL) {
193  vid_t to = atoi(t);
194  if (from != to) {
195  sharderobj.preprocessing_add_edge(from, to, EdgeDataType());
196  }
197  i++;
198  }
199  if (num != i)
200  logstream(LOG_ERROR) << "Mismatch when reading adjacency list: " << num << " != " << i << " s: " << std::string(s)
201  << " on line: " << linenum << std::endl;
202  assert(num == i);
203  }
204  }
205  free(s);
206  }
207 
208 
214  template <typename EdgeDataType>
216  public:
217  virtual ~SharderPreprocessor() {}
218  virtual std::string getSuffix() = 0;
219  virtual void reprocess(std::string preprocFilename, std::string basefileName) = 0;
220  };
221 
226  template <typename EdgeDataType>
227  int convert(std::string basefilename, std::string nshards_string, SharderPreprocessor<EdgeDataType> * preprocessor = NULL) {
228  std::string suffix = "";
229  if (preprocessor != NULL) {
230  suffix = preprocessor->getSuffix();
231  }
232  sharder<EdgeDataType> sharderobj(basefilename + suffix);
233 
234  if (!sharderobj.preprocessed_file_exists()) {
235  std::string file_type_str = get_option_string_interactive("filetype", "edgelist, adjlist");
236  if (file_type_str != "adjlist" && file_type_str != "edgelist") {
237  logstream(LOG_ERROR) << "You need to specify filetype: 'edgelist' or 'adjlist'." << std::endl;
238  assert(false);
239  }
240 
241  /* Start preprocessing */
242  sharderobj.start_preprocessing();
243 
244  if (file_type_str == "adjlist") {
245  convert_adjlist<EdgeDataType>(basefilename, sharderobj);
246  } else if (file_type_str == "edgelist") {
247  convert_edgelist<EdgeDataType>(basefilename, sharderobj);
248  }
249 
250  /* Finish preprocessing */
251  sharderobj.end_preprocessing();
252 
253  if (preprocessor != NULL) {
254  preprocessor->reprocess(sharderobj.preprocessed_name(), basefilename);
255  }
256 
257  }
258 
259  int nshards = sharderobj.execute_sharding(nshards_string);
260  logstream(LOG_INFO) << "Successfully finished sharding for " << basefilename + suffix << std::endl;
261  logstream(LOG_INFO) << "Created " << nshards << " shards." << std::endl;
262  return nshards;
263  }
264 
265 
266  template <typename EdgeDataType>
267  int convert_if_notexists(std::string basefilename, std::string nshards_string, SharderPreprocessor<EdgeDataType> * preprocessor = NULL) {
268  int nshards;
269  std::string suffix = "";
270  if (preprocessor != NULL) {
271  suffix = preprocessor->getSuffix();
272  }
273 
274  /* Check if input file is already sharded */
275  if ((nshards = find_shards<EdgeDataType>(basefilename + suffix, nshards_string))) {
276  logstream(LOG_INFO) << "Found preprocessed files for " << basefilename << ", num shards=" << nshards << std::endl;
277  return nshards;
278  }
279  logstream(LOG_INFO) << "Did not find preprocessed shards for " << basefilename + suffix << std::endl;
280  logstream(LOG_INFO) << "Will try create them now..." << std::endl;
281  nshards = convert<EdgeDataType>(basefilename, nshards_string, preprocessor);
282  return nshards;
283  }
284 
291  struct vertex_degree {
292  int deg;
293  vid_t id;
294  vertex_degree() {}
295  vertex_degree(int deg, vid_t id) : deg(deg), id(id) {}
296  };
297 
298  bool vertex_degree_less(const vertex_degree &a, const vertex_degree &b);
299  bool vertex_degree_less(const vertex_degree &a, const vertex_degree &b) {
300  return a.deg < b.deg || (a.deg == b.deg && a.id < b.id);
301  }
302 
307  template <typename EdgeDataType>
308  class OrderByDegree : public SharderPreprocessor<EdgeDataType> {
309 
310  public:
312  vid_t * translate_table;
313  vid_t max_vertex_id;
314 
315  std::string getSuffix() {
316  return "_degord";
317  }
318 
319  vid_t translate(vid_t vid) {
320  if (vid >= max_vertex_id) return vid;
321  return translate_table[vid];
322  }
323 
324  void reprocess(std::string preprocessedFile, std::string baseFilename) {
325  size_t blocksize = 32 * 1024 * 1024;
326  while (blocksize % sizeof(edge_t)) blocksize++;
327 
328  char * block = (char*) malloc(blocksize);
329  size_t total_to_process = get_filesize(preprocessedFile);
330 
331  FILE * inf = fopen(preprocessedFile.c_str(), "r");
332  if (inf == NULL) {
333  logstream(LOG_ERROR) << "Could not open: " << preprocessedFile << " error: " << strerror(errno) << std::endl;
334  }
335  assert(inf != NULL);
336  fread(&max_vertex_id, sizeof(vid_t), 1, inf);;
337  vertex_degree * degarray = (vertex_degree *) calloc(max_vertex_id + 1, sizeof(vertex_degree));
338  vid_t nverts = max_vertex_id + 1;
339  for(vid_t i=0; i < nverts; i++) {
340  degarray[i].id = i;
341  }
342 
343  size_t totread = 0;
344  do {
345  size_t len = 0;
346  while(len < blocksize) {
347  int a = (int) fread(block + len, 1, blocksize - len, inf);
348  len += a;
349  if (a <= 0) break; // eof
350  }
351  totread += len;
352 
353  logstream(LOG_DEBUG) << "Degree ordering -- read:" << (totread * 1.0 / total_to_process * 100) << "%" << std::endl;
354  len /= sizeof(edge_t);
355  edge_t * ptr = (edge_t*)block;
356 
357  for(int i=0; i<(int)len; i++) {
358  degarray[ptr[i].src].deg++;
359  degarray[ptr[i].dst].deg++;
360  }
361  } while (!feof(inf));
362  fclose(inf);
363 
364  /* Now sort */
365  quickSort(degarray, nverts, vertex_degree_less);
366 
367  /* Create translation table */
368  translate_table = (vid_t*) calloc(sizeof(vid_t), nverts);
369  for(vid_t i=0; i<nverts; i++) {
370  translate_table[degarray[i].id] = i;
371  }
372  delete degarray;
373 
374  /* Write translate table */
375  std::string translate_table_file = baseFilename + ".vertexmap";
376  int df = open(translate_table_file.c_str(), O_RDWR | O_CREAT, S_IROTH | S_IWOTH | S_IWUSR | S_IRUSR);
377  if (df < 0) logstream(LOG_ERROR) << "Could not write vertex map: " << translate_table_file <<
378  " error: " << strerror(errno) << std::endl;
379  assert(df >= 0);
380  pwrite(df, translate_table, nverts, 0);
381  close(df);
382 
383  /* Now recreate the processed file */
384  std::string tmpfilename = preprocessedFile + ".old";
385  rename(preprocessedFile.c_str(), tmpfilename.c_str());
386 
387  inf = fopen(tmpfilename.c_str(), "r");
388  if (inf == NULL) {
389  logstream(LOG_ERROR) << "Could not open: " << tmpfilename << " error: " << strerror(errno) << std::endl;
390  }
391  assert(inf != NULL);
392  fread(&max_vertex_id, sizeof(vid_t), 1, inf);;
393 
394  FILE * outf = fopen(preprocessedFile.c_str(), "w");
395  if (outf == NULL) {
396  logstream(LOG_ERROR) << "Could not open: " << preprocessedFile << " error: " << strerror(errno) << std::endl;
397  }
398  assert(outf != NULL);
399  fwrite(&max_vertex_id, sizeof(vid_t), 1, outf);
400 
401  totread = 0;
402  do {
403  size_t len = 0;
404  while(len < blocksize) {
405  int a = (int) fread(block + len, 1, blocksize - len, inf);
406  len += a;
407  if (a <= 0) break; // eof
408  }
409  totread += len;
410 
411  logstream(LOG_DEBUG) << "Degree ordering -- write/read:" << (totread * 1.0 / total_to_process * 100) << "%" << std::endl;
412  len /= sizeof(edge_t);
413  edge_t * ptr = (edge_t*)block;
414 
415  // Todo: use buffered output
416  for(int i=0; i<(int)len; i++) {
417  ptr[i].src = translate(ptr[i].src);
418  ptr[i].dst = translate(ptr[i].dst);
419 
420  fwrite(&ptr[i], sizeof(edge_t), 1, outf);
421  }
422  } while (!feof(inf));
423 
424  fclose(inf);
425  fclose(outf);
426 
427  delete translate_table;
428  free(block);
429  }
430 
431  };
432 
433 } // end namespace
434 
435 #endif
436