GraphChi  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Macros
sharder.hpp
Go to the documentation of this file.
1 
35 #ifndef GRAPHCHI_SHARDER_DEF
36 #define GRAPHCHI_SHARDER_DEF
37 
38 
39 #include <iostream>
40 #include <cstdio>
41 #include <fcntl.h>
42 #include <unistd.h>
43 #include <vector>
44 #include <omp.h>
45 #include <errno.h>
46 #include <sstream>
47 #include <string>
48 
49 #include "graphchi_types.hpp"
50 #include "api/chifilenames.hpp"
51 #include "api/graphchi_context.hpp"
52 #include "io/stripedio.hpp"
53 #include "shards/slidingshard.hpp"
54 #include "shards/memoryshard.hpp"
55 #include "logger/logger.hpp"
56 #include "util/ioutil.hpp"
57 #include "util/qsort.hpp"
58 #include "metrics/metrics.hpp"
60 
61 namespace graphchi {
62 
63 #define SHARDER_BUFSIZE (64 * 1024 * 1024)
64 
65  enum ProcPhase { COMPUTE_INTERVALS=1, SHOVEL=2 };
66 
67 
68  template <typename EdgeDataType>
69  struct edge_with_value {
70  vid_t src;
71  vid_t dst;
72  EdgeDataType value;
73 
74  edge_with_value(vid_t src, vid_t dst, EdgeDataType value) : src(src), dst(dst), value(value) {}
75 
76  bool stopper() { return src == 0 && dst == 0; }
77  };
78 
79  template <typename EdgeDataType>
80  bool edge_t_src_less(const edge_with_value<EdgeDataType> &a, const edge_with_value<EdgeDataType> &b) {
81  if (a.src == b.src) return a.dst < b.dst;
82  return a.src < b.src;
83  }
84 
85  template <typename EdgeDataType>
86  class sharder {
87 
89 
90  protected:
91  std::string basefilename;
92 
93  int binfile_fd;
94  vid_t max_vertex_id;
95 
96  /* Preprocessing */
97  char * prebuf;
98  char * prebufptr;
99 
100  size_t bytes_preprocessed;
101 
102  /* Sharding */
103  int nshards;
104  std::vector< std::pair<vid_t, vid_t> > intervals;
105  int phase;
106 
107  int * edgecounts;
108  int vertexchunk;
109  size_t nedges;
110  std::string prefix;
111 
112  int * shovelfs;
113 
114  edge_t ** bufs;
115  int * bufptrs;
116  size_t bufsize;
117  size_t edgedatasize;
118 
119  metrics m;
120 
121  public:
122 
123  sharder(std::string basefilename) : basefilename(basefilename), m("sharder") {
124  binfile_fd = (-1);
125  prebuf = NULL;
126  bufs = NULL;
127  edgedatasize = sizeof(EdgeDataType);
128  }
129 
130 
131  virtual ~sharder() {
132  if (prebuf != NULL) free(prebuf);
133  prebuf = NULL;
134  }
135 
136  std::string preprocessed_name() {
137  std::stringstream ss;
138  ss << basefilename;
139  ss << "." << sizeof(EdgeDataType) << "B.bin";
140  return ss.str();
141  }
142 
148  int f = open(preprocessed_name().c_str(), O_RDONLY);
149  if (f >= 0) {
150  close(f);
151  return true;
152  } else {
153  return false;
154  }
155  }
156 
161  if (prebuf != NULL) {
162  logstream(LOG_FATAL) << "start_preprocessing() already called! Aborting." << std::endl;
163  }
164 
165  m.start_time("preprocessing");
166  assert(prebuf == NULL);
167  std::string tmpfilename = preprocessed_name() + ".tmp";
168  binfile_fd = open(tmpfilename.c_str(), O_WRONLY | O_CREAT, S_IROTH | S_IWOTH | S_IWUSR | S_IRUSR);
169  if (binfile_fd < 0) {
170  logstream(LOG_ERROR) << "Could not create file: " << tmpfilename << " error: " << strerror(errno) << std::endl;
171  }
172  assert(binfile_fd >= 0);
173 
174  /* Initialize buffers */
175  prebuf = (char*) malloc(SHARDER_BUFSIZE);
176  prebufptr = prebuf;
177  assert(prebuf != NULL);
178 
179  logstream(LOG_INFO) << "Started preprocessing: " << basefilename << " --> " << tmpfilename << std::endl;
180 
181  /* Write the maximum vertex id place holder - to be filled later */
182  bwrite(binfile_fd, prebuf, prebufptr, (vid_t)0);
183  max_vertex_id = 0;
184  bytes_preprocessed = 0;
185  }
186 
191  assert(prebuf != NULL);
192 
193  /* Flush buffer to disk */
194  writea(binfile_fd, prebuf, prebufptr - prebuf);
195  bytes_preprocessed += prebufptr - prebuf;
196  logstream(LOG_INFO) << "Preprocessed: " << float(bytes_preprocessed / 1024. / 1024.) << " MB." << std::endl;
197 
198  /* Write the max vertex id byte */
199  assert(max_vertex_id > 0);
200  logstream(LOG_INFO) << "Maximum vertex id: " << max_vertex_id << std::endl;
201  ssize_t written = pwrite(binfile_fd, &max_vertex_id, sizeof(vid_t), 0);
202  if (written != sizeof(vid_t)) {
203  logstream(LOG_ERROR) << "Error when writing preprocessed file: " << strerror(errno) << std::endl;
204  }
205  assert(written == sizeof(vid_t));
206 
207  /* Free buffer's memory */
208  free(prebuf);
209  prebuf = NULL;
210  prebufptr = NULL;
211 
212  /* Rename temporary file */
213  std::string tmpfilename = preprocessed_name() + ".tmp";
214  rename(tmpfilename.c_str(), preprocessed_name().c_str());
215 
216  assert(preprocessed_file_exists());
217  logstream(LOG_INFO) << "Finished preprocessing: " << basefilename << " --> " << preprocessed_name() << std::endl;
218  m.stop_time("preprocessing");
219  }
220 
224  void preprocessing_add_edge(vid_t from, vid_t to, EdgeDataType val) {
225  if (prebuf == NULL) {
226  logstream(LOG_FATAL) << "You need to call start_preprocessing() prior to adding any edges!" << std::endl;
227  }
228  assert(prebuf != NULL);
229 
230  bwrite(binfile_fd, prebuf, prebufptr, edge_t(from, to, val));
231  max_vertex_id = std::max(std::max(from, to), max_vertex_id);
232  }
233 
235  template <typename T>
236  void bwrite(int f, char * buf, char * &bufptr, T val) {
237  if (bufptr + sizeof(T) - buf >= SHARDER_BUFSIZE) {
238  writea(f, buf, bufptr - buf);
239  bufptr = buf;
240 
241  if (buf == prebuf) { // Slightly ugly.
242  bytes_preprocessed += SHARDER_BUFSIZE;
243  logstream(LOG_INFO)<< "Preprocessed: " << float(bytes_preprocessed / 1024. / 1024.) << " MB." << std::endl;
244  }
245  }
246  *((T*)bufptr) = val;
247  bufptr += sizeof(T);
248  }
249 
250 
251 
252 
253 
258  int execute_sharding(std::string nshards_string) {
259  m.start_time("execute_sharding");
260  determine_number_of_shards(nshards_string);
261 
262  size_t blocksize = 32 * 1024 * 1024;
263  while (blocksize % sizeof(edge_t)) blocksize++;
264 
265  char * block = (char*) malloc(blocksize);
266  size_t total_to_process = get_filesize(preprocessed_name());
267 
268  for(int phase=1; phase <= 2; ++phase) {
269  /* Start the sharing process */
270  FILE * inf = fopen(preprocessed_name().c_str(), "r");
271  if (inf == NULL) {
272  logstream(LOG_FATAL) << "Could not open preprocessed file: " << preprocessed_name() <<
273  " error: " << strerror(errno) << std::endl;
274  }
275  assert(inf != NULL);
276 
277  /* Read max vertex id */
278  ssize_t rd = fread(&max_vertex_id, sizeof(vid_t), 1, inf);
279  if (rd != 1) {
280  logstream(LOG_ERROR) << "Read: " << rd << std::endl;
281  logstream(LOG_ERROR) << "Could not read from preprocessed file. Error: " << strerror(errno) << std::endl;
282  }
283  assert(rd == 1);
284  logstream(LOG_INFO) << "Max vertex id: " << max_vertex_id << std::endl;
285 
286  this->start_phase(phase);
287 
288  size_t totread = 0;
289  do {
290  size_t len = 0;
291  while(len < blocksize) {
292  int a = (int) fread(block + len, 1, blocksize - len, inf);
293  len += a;
294  if (a <= 0) break; // eof
295  }
296  totread += len;
297 
298  logstream(LOG_DEBUG) << "Phase: " << phase << " read:"
299  << (totread * 1.0 / total_to_process * 100) << "%" << std::endl;
300  len /= sizeof(edge_t);
301  edge_t * ptr = (edge_t*)block;
302 
303  for(size_t i=0; i<len; i++) {
304  this->receive_edge(ptr[i].src, ptr[i].dst, ptr[i].value);
305  }
306  } while (!feof(inf));
307  this->end_phase();
308 
309  fclose(inf);
310  }
311 
312  /* Release memory */
313  free(block);
314  block = NULL;
315 
316  /* Write the shards */
317  write_shards();
318 
319  m.stop_time("execute_sharding");
320 
321  /* Print metrics */
322  basic_reporter basicrep;
323  m.report(basicrep);
324 
325  return nshards;
326  }
327 
331  protected:
332 
333  virtual void determine_number_of_shards(std::string nshards_string) {
334  assert(preprocessed_file_exists());
335  if (nshards_string.find("auto") != std::string::npos || nshards_string == "0") {
336  logstream(LOG_INFO) << "Determining number of shards automatically." << std::endl;
337 
338  int membudget_mb = get_option_int("membudget_mb", 1024);
339  logstream(LOG_INFO) << "Assuming available memory is " << membudget_mb << " megabytes. " << std::endl;
340  logstream(LOG_INFO) << " (This can be defined with configuration parameter 'membudget_mb')" << std::endl;
341 
342  bytes_preprocessed = get_filesize(preprocessed_name()) - sizeof(vid_t);
343  assert(bytes_preprocessed > 0);
344  size_t numedges = bytes_preprocessed / sizeof(edge_t);
345 
346  double max_shardsize = membudget_mb * 1024. * 1024. / 8;
347  logstream(LOG_INFO) << "Determining maximum shard size: " << (max_shardsize / 1024. / 1024.) << " MB." << std::endl;
348 
349  nshards = (int) ( 2 + (numedges * sizeof(EdgeDataType) / max_shardsize) + 0.5);
350  } else {
351  nshards = atoi(nshards_string.c_str());
352  }
353 
354  logstream(LOG_INFO) << "Number of shards to be created: " << nshards << std::endl;
355  assert(nshards > 1);
356  }
357 
358  void compute_partitionintervals() {
359  size_t edges_per_part = nedges / nshards + 1;
360 
361  logstream(LOG_INFO) << "Number of shards: " << nshards << std::endl;
362  logstream(LOG_INFO) << "Edges per shard: " << edges_per_part << std::endl;
363  logstream(LOG_INFO) << "Max vertex id: " << max_vertex_id << std::endl;
364 
365  vid_t cur_st = 0;
366  size_t edgecounter=0;
367  std::string fname = filename_intervals(basefilename, nshards);
368  FILE * f = fopen(fname.c_str(), "w");
369 
370  if (f == NULL) {
371  logstream(LOG_ERROR) << "Could not open file: " << fname << " error: " <<
372  strerror(errno) << std::endl;
373  }
374  assert(f != NULL);
375 
376  vid_t i = 0;
377  while(nshards > (int) intervals.size()) {
378  i += vertexchunk;
379  edgecounter += edgecounts[i / vertexchunk];
380  if (edgecounter >= edges_per_part || (i >= max_vertex_id)) {
381  intervals.push_back(std::pair<vid_t,vid_t>(cur_st, i + (i >= max_vertex_id)));
382  logstream(LOG_INFO) << "Interval: " << cur_st << " - " << i << std::endl;
383  fprintf(f, "%u\n", i + (i == max_vertex_id));
384  cur_st = i + 1;
385  edgecounter = 0;
386  }
387  }
388  fclose(f);
389  assert(nshards == (int)intervals.size());
390 
391  logstream(LOG_INFO) << "Computed intervals." << std::endl;
392  }
393 
394  std::string shovel_filename(int shard) {
395  std::stringstream ss;
396  ss << basefilename << shard << "." << nshards << ".shovel";
397  return ss.str();
398  }
399 
400  void start_phase(int p) {
401  phase = p;
402  lastpart = 0;
403  logstream(LOG_INFO) << "Starting phase: " << phase << std::endl;
404  switch (phase) {
405  case COMPUTE_INTERVALS:
406  /* To compute the intervals, we need to keep track of the vertex degrees.
407  If there is not enough memory to store degree for each vertex, we combine
408  degrees of successive vertice. This results into less accurate shard split,
409  but in practice it hardly matters. */
410  vertexchunk = (int) (max_vertex_id * sizeof(int) / (1024 * 1024 * get_option_long("membudget_mb", 1024)));
411  if (vertexchunk<1) vertexchunk = 1;
412  edgecounts = (int*)calloc( max_vertex_id / vertexchunk + 1, sizeof(int));
413  nedges = 0;
414  break;
415 
416  case SHOVEL:
417  shovelfs = new int[nshards];
418  bufs = new edge_t*[nshards];
419  bufptrs = new int[nshards];
420  bufsize = (1024 * 1024 * get_option_long("membudget_mb", 1024)) / nshards / 4;
421  while(bufsize % sizeof(edge_t) != 0) bufsize++;
422 
423  logstream(LOG_DEBUG)<< "Shoveling bufsize: " << bufsize << std::endl;
424 
425  for(int i=0; i < nshards; i++) {
426  std::string fname = shovel_filename(i);
427  shovelfs[i] = open(fname.c_str(), O_WRONLY | O_CREAT, S_IROTH | S_IWOTH | S_IWUSR | S_IRUSR);
428  if (shovelfs[i] < 0) {
429  logstream(LOG_ERROR) << "Could not create a temporary file " << fname <<
430  " error: " << strerror(errno) << std::endl;
431  }
432  assert(shovelfs[i] >= 0);
433  bufs[i] = (edge_t*) malloc(bufsize);
434  bufptrs[i] = 0;
435  }
436  break;
437  }
438  }
439 
440  void end_phase() {
441  logstream(LOG_INFO) << "Ending phase: " << phase << std::endl;
442  switch (phase) {
443  case COMPUTE_INTERVALS:
444  compute_partitionintervals();
445  free(edgecounts);
446  edgecounts = NULL;
447  break;
448  case SHOVEL:
449  for(int i=0; i<nshards; i++) {
450  writea(shovelfs[i], bufs[i], sizeof(edge_t) * (bufptrs[i]));
451  close(shovelfs[i]);
452  free(bufs[i]);
453  }
454  free(bufs);
455  free(bufptrs);
456  break;
457  }
458  }
459 
460 
461  int lastpart;
462 
463  void swrite(int shard, edge_t et) {
464  bufs[shard][bufptrs[shard]++] = et;
465  if (bufptrs[shard] * sizeof(edge_t) >= bufsize) {
466  writea(shovelfs[shard], bufs[shard], sizeof(edge_t) * bufptrs[shard]);
467  bufptrs[shard] = 0;
468  }
469 
470  }
471 
472  void receive_edge(vid_t from, vid_t to, EdgeDataType value) {
473  if (to == from) {
474  logstream(LOG_WARNING) << "Tried to add self-edge " << from << "->" << to << std::endl;
475  return;
476  }
477  if (from > max_vertex_id || to > max_vertex_id) {
478  logstream(LOG_ERROR) << "Tried to add an edge with too large from/to values. From:" <<
479  from << " to: "<< to << " max: " << max_vertex_id << std::endl;
480  assert(false);
481  }
482  switch (phase) {
483  case COMPUTE_INTERVALS:
484  edgecounts[to / vertexchunk]++;
485  nedges++;
486  break;
487  case SHOVEL:
488  bool found=false;
489  for(int i=0; i < nshards; i++) {
490  int shard = (lastpart + i) % nshards;
491  if (to >= intervals[shard].first && to <= intervals[shard].second) {
492  swrite(shard, edge_t(from, to, value));
493  lastpart = shard; // Small optimizations, which works if edges are in order for each vertex - not much though
494  found = true;
495  break;
496  }
497  }
498  if(!found) {
499  logstream(LOG_ERROR) << "Shard not found for : " << to << std::endl;
500  }
501  assert(found);
502  break;
503  }
504  }
505 
506 
507 
513  virtual void write_shards() {
514  for(int shard=0; shard < nshards; shard++) {
515  logstream(LOG_INFO) << "Starting final processing for shard: " << shard << std::endl;
516 
517  std::string shovelfname = shovel_filename(shard);
518  std::string fname = filename_shard_adj(basefilename, shard, nshards);
519  std::string edfname = filename_shard_edata<EdgeDataType>(basefilename, shard, nshards);
520 
521  edge_t * shovelbuf;
522  int shovelf = open(shovelfname.c_str(), O_RDONLY);
523  size_t shovelsize = readfull(shovelf, (char**) &shovelbuf);
524  size_t numedges = shovelsize / sizeof(edge_t);
525 
526  logstream(LOG_DEBUG) << "Shovel size:" << shovelsize << " edges: " << numedges << std::endl;
527 
528  quickSort(shovelbuf, (int)numedges, edge_t_src_less<EdgeDataType>);
529 
530  // Create the final file
531  int f = open(fname.c_str(), O_WRONLY | O_CREAT, S_IROTH | S_IWOTH | S_IWUSR | S_IRUSR);
532  if (f < 0) {
533  logstream(LOG_ERROR) << "Could not open " << fname << " error: " << strerror(errno) << std::endl;
534  }
535  assert(f >= 0);
536  int trerr = ftruncate(f, 0);
537  assert(trerr == 0);
538 
539  /* Create edge data file */
540  int ef = open(edfname.c_str(), O_WRONLY | O_CREAT, S_IROTH | S_IWOTH | S_IWUSR | S_IRUSR);
541  if (ef < 0) {
542  logstream(LOG_ERROR) << "Could not open " << edfname << " error: " << strerror(errno) << std::endl;
543 
544  }
545  assert(ef >= 0);
546 
547  char * buf = (char*) malloc(SHARDER_BUFSIZE);
548  char * bufptr = buf;
549  char * ebuf = (char*) malloc(SHARDER_BUFSIZE);
550  char * ebufptr = ebuf;
551 
552  vid_t curvid=0;
553  size_t istart = 0;
554  for(size_t i=0; i <= numedges; i++) {
555  edge_t edge = (i < numedges ? shovelbuf[i] : edge_t(0, 0, EdgeDataType())); // Last "element" is a stopper
556  bwrite<EdgeDataType>(ef, ebuf, ebufptr, EdgeDataType(edge.value));
557 
558  if ((edge.src != curvid)) {
559  // New vertex
560  size_t count = i - istart;
561  assert(count>0 || curvid==0);
562  if (count>0) {
563  if (count < 255) {
564  uint8_t x = (uint8_t)count;
565  bwrite<uint8_t>(f, buf, bufptr, x);
566  } else {
567  bwrite<uint8_t>(f, buf, bufptr, 0xff);
568  bwrite<uint32_t>(f, buf, bufptr,(uint32_t)count);
569  }
570  }
571 
572  for(size_t j=istart; j<i; j++) {
573  bwrite(f, buf, bufptr, shovelbuf[j].dst);
574  }
575 
576  istart = i;
577 
578  // Handle zeros
579  if (!edge.stopper()) {
580  if (edge.src - curvid > 1 || (i == 0 && edge.src>0)) {
581  int nz = edge.src-curvid-1;
582  if (i == 0 && edge.src>0) nz = edge.src; // border case with the first one
583  do {
584  bwrite<uint8_t>(f, buf, bufptr, 0);
585  nz--;
586  int tnz = std::min(254, nz);
587  bwrite<uint8_t>(f, buf, bufptr, (uint8_t) tnz);
588  nz -= tnz;
589  } while (nz>0);
590  }
591  }
592  curvid = edge.src;
593  }
594  }
595 
596  /* Flush buffers and free memory */
597  writea(f, buf, bufptr - buf);
598  free(buf);
599  free(shovelbuf);
600  close(f);
601  close(shovelf);
602 
603  writea(ef, ebuf, ebufptr - ebuf);
604  close(ef);
605 
606  free(ebuf);
607  remove(shovelfname.c_str());
608 
609  }
610 
611  create_degree_file();
612  }
613 
614 
615  typedef char dummy_t;
616 
617  typedef sliding_shard<int, dummy_t> slidingshard_t;
618  typedef memory_shard<int, dummy_t> memshard_t;
619 
620 
621  void create_degree_file() {
622  // Initialize IO
623  stripedio * iomgr = new stripedio(m);
624  std::vector<slidingshard_t * > sliding_shards;
625 
626  int subwindow = 5000000;
627  std::cout << "Subwindow : " << subwindow << std::endl;
628  m.set("subwindow", (size_t)subwindow);
629 
630  int loadthreads = 4;
631 
632  m.start_time("degrees.runtime");
633 
634  /* Initialize streaming shards */
635  int blocksize = get_option_int("blocksize", 1024 * 1024);
636 
637  for(int p=0; p < nshards; p++) {
638  std::cout << "Initialize streaming shard: " << p << std::endl;
639  sliding_shards.push_back(
640  new slidingshard_t(iomgr, filename_shard_edata<EdgeDataType>(basefilename, p, nshards),
641  filename_shard_adj(basefilename, p, nshards), intervals[p].first,
642  intervals[p].second,
643  blocksize, m, true));
644  }
645  for(int p=0; p < nshards; p++) sliding_shards[p]->only_adjacency = true;
646 
647  graphchi_context ginfo;
648  ginfo.nvertices = 1 + intervals[nshards - 1].second;
649  ginfo.scheduler = NULL;
650 
651 
652  std::string outputfname = filename_degree_data(basefilename);
653 
654  int degreeOutF = open(outputfname.c_str(), O_RDWR | O_CREAT, S_IROTH | S_IWOTH | S_IWUSR | S_IRUSR);
655  if (degreeOutF < 0) {
656  logstream(LOG_ERROR) << "Could not create: " << degreeOutF << std::endl;
657  }
658  assert(degreeOutF >= 0);
659  int trerr = ftruncate(degreeOutF, ginfo.nvertices * sizeof(int) * 2);
660  assert(trerr == 0);
661 
662  for(int window=0; window<nshards; window++) {
663  metrics_entry mwi = m.start_time();
664 
665  vid_t interval_st = intervals[window].first;
666  vid_t interval_en = intervals[window].second;
667 
668  /* Flush stream shard for the window */
669  sliding_shards[window]->flush();
670 
671  /* Load shard[window] into memory */
672  memshard_t memshard(iomgr, filename_shard_edata<EdgeDataType>(basefilename, window, nshards), filename_shard_adj(basefilename, window, nshards),
673  interval_st, interval_en, m);
674  memshard.only_adjacency = true;
675  logstream(LOG_INFO) << "Interval: " << interval_st << " " << interval_en << std::endl;
676 
677  for(vid_t subinterval_st=interval_st; subinterval_st < interval_en; ) {
678  vid_t subinterval_en = std::min(interval_en, subinterval_st + subwindow);
679  logstream(LOG_INFO) << "(Degree proc.) Sub-window: [" << subinterval_st << " - " << subinterval_en << "]" << std::endl;
680  assert(subinterval_en >= subinterval_st && subinterval_en <= interval_en);
681 
682  /* Preallocate vertices */
683  metrics_entry men = m.start_time();
684  int nvertices = subinterval_en-subinterval_st+1;
685  std::vector< graphchi_vertex<int, dummy_t> > vertices(nvertices, graphchi_vertex<int, dummy_t>()); // preallocate
686 
687 
688  for(int i=0; i < nvertices; i++) {
689  vertices[i] = graphchi_vertex<int, dummy_t>(subinterval_st + i, NULL, NULL, 0, 0);
690  vertices[i].scheduled = true;
691  }
692 
693  metrics_entry me = m.start_time();
694  omp_set_num_threads(loadthreads);
695 #pragma omp parallel for
696  for(int p=-1; p < nshards; p++) {
697  if (p == (-1)) {
698  // if first window, now need to load the memshard
699  if (memshard.loaded() == false) {
700  memshard.load();
701  }
702 
703  /* Load vertices from memshard (only inedges for now so can be done in parallel) */
704  memshard.load_vertices(subinterval_st, subinterval_en, vertices);
705  } else {
706  /* Stream forward other than the window partition */
707  if (p != window) {
708  sliding_shards[p]->read_next_vertices(nvertices, subinterval_st, vertices, false);
709  }
710  }
711  }
712 
713  m.stop_time(me, "stream_ahead", window, true);
714 
715 
716  metrics_entry mev = m.start_time();
717  // Read first current values
718 
719  int * vbuf = (int*) malloc(nvertices*sizeof(int)*2);
720 
721  for(int i=0; i<nvertices; i++) {
722  vbuf[2 * i] = vertices[i].num_inedges();
723  vbuf[2 * i +1] = vertices[i].num_outedges();
724  }
725  pwritea(degreeOutF, vbuf, nvertices * sizeof(int) * 2, subinterval_st * sizeof(int) * 2);
726 
727  free(vbuf);
728 
729  // Move window
730  subinterval_st = subinterval_en+1;
731  }
732  /* Move the offset of the window-shard forward */
733  sliding_shards[window]->set_offset(memshard.offset_for_stream_cont(), memshard.offset_vid_for_stream_cont(),
734  memshard.edata_ptr_for_stream_cont());
735  }
736  close(degreeOutF);
737  m.stop_time("degrees.runtime");
738  delete iomgr;
739  }
740 
741 
742  }; // End class sharder
743 
744 }; // namespace
745 
746 
747 #endif
748 
749 
750