GraphChi  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Macros
graphchi_dynamicgraph_engine.hpp
Go to the documentation of this file.
1 
2 
31 #ifndef GRAPHCHI_DYNAMICGRAPHENGINE_DEF
32 #define GRAPHCHI_DYNAMICGRAPHENGINE_DEF
33 
34 #include <stdlib.h>
35 #include <vector>
36 
39 #include "logger/logger.hpp"
40 
41 
42 namespace graphchi {
43 
48  template <typename VertexDataType, typename EdgeDataType, typename svertex_t = graphchi_vertex<VertexDataType, EdgeDataType> >
49  class graphchi_dynamicgraph_engine : public graphchi_engine<VertexDataType, EdgeDataType, svertex_t> {
50  public:
53 
54  graphchi_dynamicgraph_engine(std::string base_filename, int nshards, bool selective_scheduling, metrics &_m) :
55  graphchi_engine<VertexDataType, EdgeDataType, svertex_t>(base_filename, nshards, selective_scheduling, _m){
56  _m.set("engine", "dynamicgraphs");
57  added_edges = 0;
58  maxshardsize = 200 * 1024 * 1024;
59  }
60 
61  protected:
62 
66  std::vector< std::vector< edge_buffer * > > new_edge_buffers;
67  std::vector<int> deletecounts;
68  std::vector<std::string> shard_suffices;
69 
70  vid_t max_vertex_id;
71  size_t max_edge_buffer;
72  size_t last_commit;
73  size_t added_edges;
74  std::string state;
75  size_t maxshardsize;
76  size_t edges_in_shards;
77 
78 
83  mutex schedulerlock;
84  mutex shardlock;
85 
89  virtual bool disable_preloading() {
90  return true;
91  }
92 
97  /* FIXME: This is bad software design - we should not have a filename dependency here. */
98  std::string orig_degree_file = filename_degree_data(this->base_filename);
99  std::string dynsuffix = ".dynamic";
100  std::string dynamic_degree_file = filename_degree_data(this->base_filename + dynsuffix);
101  cp(orig_degree_file, dynamic_degree_file);
102  return new degree_data(this->base_filename + dynsuffix, this->iomgr);
103  }
104  virtual size_t num_edges() {
105  shardlock.lock();
106  size_t ne = 0;
107  for(int i=0; i < this->nshards; i++) {
108  ne += this->sliding_shards[i]->num_edges();
109  for(int j=0; j < (int) new_edge_buffers[i].size(); j++)
110  ne += new_edge_buffers[i][j]->size();
111  }
112  shardlock.unlock();
113  return ne;
114  }
115 
116  public:
117 
118  size_t num_edges_safe() {
119  return added_edges + edges_in_shards;
120  }
121 
122  size_t num_buffered_edges() {
123  return added_edges;
124  }
125 
126  protected:
127  void init_buffers() {
128  max_edge_buffer = get_option_long("max_edgebuffer_mb", 1000) * 1024 * 1024 / sizeof(created_edge<EdgeDataType>);
129 
130  // Save old so if there are existing edges, they can be moved
131  std::vector< std::vector< edge_buffer * > > tmp_new_edge_buffers;
132  for(int i=0; i < this->nshards; i++) {
133  std::vector<edge_buffer *> shardbuffers = std::vector<edge_buffer *>();
134  for(int j=0; j < this->nshards; j++) {
135  shardbuffers.push_back(new edge_buffer());
136  }
137  tmp_new_edge_buffers.push_back(shardbuffers);
138  }
139 
140  // Move old edges. This is not the fastest way... but takes only about 0.05 secs
141  // on the twitter experiment
142  int i = 0;
143  for(typename std::vector< std::vector< edge_buffer * > >::iterator oldit = new_edge_buffers.begin();
144  oldit != new_edge_buffers.end(); ++oldit) {
145  for(typename std::vector< edge_buffer *>::iterator bufit = oldit->begin(); bufit != oldit->end(); ++bufit) {
146  edge_buffer &buffer_for_window = **bufit;
147  for(unsigned int ebi = 0; ebi < buffer_for_window.size(); ebi++ ) {
148  created_edge<EdgeDataType> * edge = buffer_for_window[ebi];
149  int shard = get_shard_for(edge->dst);
150  int srcshard = get_shard_for(edge->src);
151  i++;
152  tmp_new_edge_buffers[shard][srcshard]->add(*edge);
153  }
154  delete *bufit;
155  }
156  }
157 
158  std::cout << "TRANSFERRED " << i << " EDGES OVER." << std::endl;
159 
160  new_edge_buffers = tmp_new_edge_buffers;
161  }
162 
163 
167  // Should be changed to read the file in smaller chunks
168  size_t cp(std::string origfile, std::string dstfile, bool zeroout=false) {
169  char * buf;
170  int f = open(origfile.c_str(), O_RDONLY);
171  size_t len = readfull(f, &buf);
172  std::cout << "Length: " << len << std::endl;
173  std::cout << origfile << " ----> " << dstfile << std::endl;
174 
175  close(f);
176  remove(dstfile.c_str());
177  int of = open(dstfile.c_str(), O_WRONLY | O_CREAT, S_IROTH | S_IWOTH | S_IWUSR | S_IRUSR);
178  assert(of >= 0);
179  if (zeroout) {
180  memset(buf, 0, len);
181  }
182  writea(of, buf, len);
183 
184  assert(get_filesize(origfile) == get_filesize(dstfile));
185  close(of);
186  free(buf);
187  return len;
188  }
189 
190 
191  virtual typename base_engine::memshard_t * create_memshard(vid_t interval_st, vid_t interval_en) {
192  int p = this->exec_interval;
193  std::string adj_filename = filename_shard_adj(this->base_filename, 0, 0) + ".dyngraph" + shard_suffices[p];
194  std::string edata_filename = filename_shard_edata<EdgeDataType>(this->base_filename, 0, 0) + ".dyngraph" + shard_suffices[p];
195  return new typename base_engine::memshard_t(this->iomgr,
196  edata_filename,
197  adj_filename,
198  interval_st,
199  interval_en,
200  this->m);
201  }
202 
203 
207  virtual void initialize_sliding_shards() {
208  shardlock.lock();
209  if (this->sliding_shards.empty()) {
210  for(int p=0; p < this->nshards; p++) {
211  std::string adj_filename = filename_shard_adj(this->base_filename, 0, 0) + ".dyngraph" + shard_suffices[p];
212  std::string edata_filename = filename_shard_edata<EdgeDataType>(this->base_filename, 0, 0) + ".dyngraph" + shard_suffices[p];
213 
214  this->sliding_shards.push_back(
215  new typename base_engine::slidingshard_t(this->iomgr, edata_filename,
216  adj_filename,
217  this->intervals[p].first,
218  this->intervals[p].second,
219  this->blocksize,
220  this->m,
221  !this->modifies_outedges,
222  false));
223  }
224  } else {
225  for(int p=0; p < this->nshards; p++) {
226  if (this->sliding_shards[p] == NULL) {
227  std::string adj_filename = filename_shard_adj(this->base_filename, 0, 0) + ".dyngraph" + shard_suffices[p];
228  std::string edata_filename = filename_shard_edata<EdgeDataType>(this->base_filename, 0, 0) + ".dyngraph" + shard_suffices[p];
229 
230  this->sliding_shards[p] = new typename base_engine::slidingshard_t(this->iomgr, edata_filename,
231  adj_filename,
232  this->intervals[p].first,
233  this->intervals[p].second,
234  this->blocksize,
235  this->m,
236  !this->modifies_outedges,
237  false);
238  }
239  }
240  }
241  shardlock.unlock();
242  edges_in_shards = num_edges();
243 
244  }
245 
246  void prepare_clean_slate() {
247  logstream(LOG_INFO) << "Preparing clean slate..." << std::endl;
248  for(int shard=0; shard < this->nshards; shard++) {
249  shard_suffices.push_back(get_part_str(shard, this->nshards));
250 
251  std::string edata_filename = filename_shard_edata<EdgeDataType>(this->base_filename, shard, this->nshards);
252  std::string adj_filename = filename_shard_adj(this->base_filename, shard, this->nshards);
253  std::string dest_adj = filename_shard_adj(this->base_filename, 0, 0) + ".dyngraph" + shard_suffices[shard];
254  std::string dest_edata = filename_shard_edata<EdgeDataType>(this->base_filename, 0, 0) + ".dyngraph" + shard_suffices[shard];
255 
256  cp(edata_filename, dest_edata, true);
257  cp(adj_filename, dest_adj);
258  }
259  }
260 
261  int get_shard_for(vid_t dst) {
262  for(int i=0; i < this->nshards; i++) {
263  if (dst >= this->intervals[i].first && dst <= this->intervals[i].second) {
264  return i;
265  }
266  }
267  return this->nshards - 1; // Last shard
268  }
269 
270  public:
271  bool add_edge(vid_t src, vid_t dst, EdgeDataType edata) {
272  if (src == dst) {
273  logstream(LOG_WARNING) << "WARNING : tried to add self-edge!" << std::endl;
274  return true;
275  }
276  if (this->iter < 1) {
277  logstream(LOG_WARNING) << "Tried to add edge before first iteration has passed" << std::endl;
278  usleep(1000000);
279  return false;
280  }
281  if (added_edges - last_commit > 1.2 * max_edge_buffer) {
282  logstream(LOG_INFO) << "Over 20% of max buffer... hold on...." << std::endl;
283  usleep(1000000); // Sleep 1 sec
284  return false;
285  }
286  modification_lock.lock();
287  added_edges++;
288  int shard = get_shard_for(dst);
289  int srcshard = get_shard_for(src);
290  /* Maintain max vertex id */
291  vid_t prev_max_id = max_vertex_id;
292  max_vertex_id = std::max(max_vertex_id, dst);
293  max_vertex_id = std::max(max_vertex_id, src);
294 
295  // Extend degree and vertex data files
296  if (max_vertex_id>prev_max_id) {
297  this->degree_handler->ensure_size(this->max_vertex_id); // Expand the file
298 
299  // Expand scheduler
300  if (this->scheduler != NULL) {
301  schedulerlock.lock();
302  this->scheduler->resize(1 + max_vertex_id);
303  schedulerlock.unlock();
304  }
305  }
306 
307  // Add edge to buffers
308  new_edge_buffers[shard][srcshard]->add(src, dst, edata);
309  modification_lock.unlock();
310  return true;
311  }
312 
313  void add_task(vid_t vid) {
314  if (this->scheduler != NULL) {
315  modification_lock.lock();
316  this->scheduler->add_task(vid);
317  modification_lock.unlock();
318  }
319  }
320 
321  protected:
322  void incorporate_buffered_edges(int window, vid_t window_st, vid_t window_en, std::vector<svertex_t> & vertices) {
323  // Lock acquired
324  int ncreated = 0;
325  // First outedges
326  for(int shard=0; shard<this->nshards; shard++) {
327  edge_buffer &buffer_for_window = *new_edge_buffers[shard][window];
328  for(unsigned int ebi=0; ebi<buffer_for_window.size(); ebi++) {
329  created_edge<EdgeDataType> * edge = buffer_for_window[ebi];
330  if (edge->src >= window_st && edge->src <= window_en) {
331  if (vertices[edge->src-window_st].scheduled) {
332  if (vertices[edge->src-window_st].scheduled)
333  vertices[edge->src-window_st].add_outedge(edge->dst, &edge->data, false);
334  ncreated++;
335  }
336  }
337  }
338  }
339 
340  // Then inedges
341  for(int w=0; w<this->nshards; w++) {
342  edge_buffer &buffer_for_window = *new_edge_buffers[window][w];
343  for(unsigned int ebi=0; ebi<buffer_for_window.size(); ebi++) {
344  created_edge<EdgeDataType> * edge = buffer_for_window[ebi];
345  if (edge->dst >= window_st && edge->dst <= window_en) {
346  if (vertices[edge->dst - window_st].scheduled) {
347  assert(edge->data < 1e20);
348  if (vertices[edge->dst-window_st].scheduled)
349  vertices[edge->dst - window_st].add_inedge(edge->src, &edge->data, false);
350  ncreated++;
351  }
352  }
353  }
354  }
355  logstream(LOG_INFO) << "::: Used " << ncreated << " buffered edges." << std::endl;
356  }
357 
358  bool incorporate_new_edge_degrees(int window, vid_t window_st, vid_t window_en) {
359  bool modified = false;
360  // First outedges
361  for(int shard=0; shard < this->nshards; shard++) {
362  edge_buffer &buffer_for_window = *new_edge_buffers[shard][window];
363  for(unsigned int ebi=0; ebi<buffer_for_window.size(); ebi++) {
364  created_edge<EdgeDataType> * edge = buffer_for_window[ebi];
365  if (edge->src >= window_st && edge->src <= window_en) {
366  if (!edge->accounted_for_outc) {
367  degree d = this->degree_handler->get_degree(edge->src);
368  d.outdegree++;
369  this->degree_handler->set_degree(edge->src, d);
370 
371  modified = true;
372  edge->accounted_for_outc = true;
373  }
374  }
375  }
376  }
377 
378  // Then inedges
379  for(int w=0; w < this->nshards; w++) {
380  edge_buffer &buffer_for_window = *new_edge_buffers[window][w];
381  for(unsigned int ebi=0; ebi<buffer_for_window.size(); ebi++) {
382  created_edge<EdgeDataType> * edge = buffer_for_window[ebi];
383  if (edge->dst >= window_st && edge->dst <= window_en) {
384  if (!edge->accounted_for_inc) {
385  degree d = this->degree_handler->get_degree(edge->dst);
386  d.indegree++;
387  this->degree_handler->set_degree(edge->dst, d);
388  edge->accounted_for_inc = true;
389  modified = true;
390  }
391  }
392  }
393  }
394  return modified;
395  }
396 
397  void adjust_degrees_for_deleted(std::vector< svertex_t > &vertices, vid_t window_st) {
398 #ifdef SUPPORT_DELETIONS
399 
400  bool somechanged = false;
401  for(int i=0; i < (int)vertices.size(); i++) {
402  svertex_t &v = vertices[i];
403  if (v.scheduled) {
404  this->degree_handler->set_degree(v.id(), v.inc, v.outc);
405  somechanged = somechanged || (v.deleted_inc + v.deleted_outc > 0);
406  degree deg = this->degree_handler->get_degree(v.id());
407 
408  if (!(deg.indegree >=0 && deg.outdegree >= 0)) {
409  std::cout << "Degree discrepancy: " << deg.indegree << " " << deg.outdegree << std::endl;
410  }
411  assert(deg.indegree >=0 && deg.outdegree >= 0);
412  }
413  }
414  if (somechanged) {
415  this->degree_handler->save();
416  }
417 #endif
418  }
419 
420  virtual vid_t determine_next_window(vid_t iinterval, vid_t fromvid, vid_t maxvid, size_t membudget) {
421  /* Load degrees */
422  this->degree_handler->load(fromvid, maxvid);
423  if (incorporate_new_edge_degrees(iinterval, fromvid, maxvid)) {
424  this->degree_handler->save();
425  }
426 
427  size_t memreq = 0;
428  int max_interval = maxvid - fromvid;
429  for(int i=0; i < max_interval; i++) {
430  degree deg = this->degree_handler->get_degree(fromvid + i);
431  int inc = deg.indegree;
432  int outc = deg.outdegree;
433 
434  // Raw data and object cost included
435  memreq += sizeof(svertex_t) + (sizeof(EdgeDataType) + sizeof(vid_t) +
436  sizeof(graphchi_edge<EdgeDataType>))*(outc + inc);
437  if (memreq > membudget) {
438  return fromvid + i - 1; // Previous was enough
439  }
440  }
441  return maxvid;
442  }
443 
444 
445  virtual void load_before_updates(std::vector<svertex_t> &vertices) {
446  this->base_engine::load_before_updates(vertices);
447 
448 #ifdef SUPPORT_DELETIONS
449  for(unsigned int i=0; i < (unsigned int)vertices.size(); i++) {
450  deletecounts[this->exec_interval] += vertices[i].deleted_inc;
451  }
452  #endif
453  }
454 
455 
456  virtual void init_vertices(std::vector<svertex_t> &vertices,
457  graphchi_edge<EdgeDataType> * &edata) {
458  modification_lock.lock();
459  base_engine::init_vertices(vertices, edata);
460  incorporate_buffered_edges(this->exec_interval, this->sub_interval_st, this->sub_interval_en, vertices);
461  modification_lock.unlock();
462  }
463 
464 
465  virtual void initialize_iter() {
466  this->intervals[this->nshards - 1].second = max_vertex_id;
467  this->vertex_data_handler->check_size(max_vertex_id + 1);
469 
470  /* Deleted edge tracking */
471  deletecounts.clear();
472  for(int p=0; p < this->nshards; p++)
473  deletecounts.push_back(0);
474  }
475 
476  virtual void iteration_finished() {
477  if (this->iter < this->niters - 1) {
478  // Flush and restart stream shards before commiting edges
479  for(int p=0; p < this->nshards; p++) {
480  this->sliding_shards[p]->flush();
481  this->sliding_shards[p]->set_offset(0, 0, 0);
482  }
483 
484  this->iomgr->wait_for_writes();
485 
487  }
488  }
489 
490  virtual void initialize_before_run() {
491  prepare_clean_slate();
492  init_buffers();
493 
494  max_vertex_id = (vid_t) (this->num_vertices() - 1);
495 
496  this->vertex_data_handler->clear(this->num_vertices());
497  }
498 
499 
500  /* */
501  virtual void load_after_updates(std::vector<svertex_t> &vertices) {
502  this->base_engine::load_after_updates(vertices);
503  adjust_degrees_for_deleted(vertices, this->sub_interval_st);
504  }
505 
506  public:
507  void finish_after_iters(int extra_iters) {
508  this->chicontext.last_iteration = this->chicontext.iteration + extra_iters;
509  }
510 
511  protected:
512 
513 
514 #define BBUF 32000000
515 
520  // Count deleted
521  size_t ndeleted = 0;
522  for(size_t i=0; i < deletecounts.size(); i++) {
523  ndeleted += deletecounts[i];
524  }
525 
526  // TODO: remove ad hoc limits, move to configuration.
527  // Perhaps do some cost estimation?
528  logstream(LOG_DEBUG) << "Total deleted: " << ndeleted << " total edges: " << this->num_edges() << std::endl;
529 
530  if (added_edges - last_commit < max_edge_buffer * 0.8 && ndeleted < this->num_edges() * 0.1) {
531  std::cout << "==============================" << std::endl;
532  std::cout << "No time to commit yet.... Only " << (added_edges - last_commit) << " / " << max_edge_buffer
533  << " in buffers" << std::endl;
534  return;
535  }
536 
537 
538  bool rangeschanged = false;
539  state = "commit-ingests";
540  vid_t maxwindow = 4000000; // FIXME: HARDCODE
541  size_t mem_budget = this->membudget_mb * 1024 * 1024;
542  modification_lock.lock();
543 
544  // Clean up sliding shards
545  // NOTE: there is a problem since this will waste
546  // io-sessions
547  std::vector<int> edgespershard;
548  for(int p=0; p < this->nshards; p++) {
549  edgespershard.push_back(this->sliding_shards[p]->num_edges());
550  }
551 
552  std::vector<std::pair<vid_t, vid_t> > newranges;
553  std::vector<std::string> newsuffices;
554 
555  char iterstr[128];
556  sprintf(iterstr, "%d", this->iter);
557 
558  size_t min_buffer_in_shard_to_commit = max_edge_buffer / this->nshards / 2;
559 
560  std::vector<bool> was_commited(this->nshards, true);
561 
562  for(int shard=0; shard < this->nshards; shard++) {
563  std::vector<edge_buffer*> &shard_buffer = new_edge_buffers[shard];
564 
565  // Check there are any new edges
566  size_t bufedges = 0;
567  for(int w=0; w < this->nshards; w++) {
568  bufedges += shard_buffer[w]->size();
569 
570  }
571 
572  if (bufedges < min_buffer_in_shard_to_commit && deletecounts[shard] * 1.0 / edgespershard[shard] < 0.2) {
573  logstream(LOG_DEBUG) << shard << ": not enough edges for shard: " << bufedges << " deleted:" << deletecounts[shard] << "/" << edgespershard[shard] << std::endl;
574  newranges.push_back(this->intervals[shard]);
575  newsuffices.push_back(shard_suffices[shard]);
576  was_commited[shard] = false;
577  continue;
578  } else {
579  logstream(LOG_DEBUG) << shard << ": going to rewrite, deleted:" << deletecounts[shard] << "/" << edgespershard[shard] << " bufedges: " << bufedges << std::endl;
580  shardlock.lock();
581  delete this->sliding_shards[shard];
582  this->sliding_shards[shard] = NULL;
583  shardlock.unlock();
584  }
585  std::string origshardfile = filename_shard_edata<EdgeDataType>(this->base_filename, 0, 0) + ".dyngraph" + shard_suffices[shard];
586  std::string origadjfile = filename_shard_adj(this->base_filename, 0, 0) + ".dyngraph" + shard_suffices[shard];
587 
588  // Get file size
589  int of = open(origshardfile.c_str(), O_RDONLY);
590  off_t sz = lseek(of, 0, SEEK_END);
591  lseek(of, 0, SEEK_SET);
592  close(of);
593 
594  int outparts = ( sz >= (off_t) maxshardsize ? 2 : 1);
595 
596  vid_t splitpos = 0;
597  if (sz > (off_t)maxshardsize) {
598  rangeschanged = true;
599  // Compute number edges (not including ingested ones!)
600  size_t halfedges = (sz / sizeof(EdgeDataType)) / 2;
601  // Correct to include estimate of ingested ones
602  for(int w=0; w < this->nshards; w++) {
603  halfedges += new_edge_buffers[shard][w]->size() / 2;
604  }
605  size_t nedges = 0;
606 
607  vid_t st = this->intervals[shard].first;
608  splitpos = st + (this->intervals[shard].second - st) / 2;
609  bool found = false;
610  while(st < this->intervals[shard].second) {
611  vid_t en = std::min(st + maxwindow, this->intervals[shard].second);
612  this->degree_handler->load(st, en);
613  int nv = en - st + 1;
614 
615  for(int i=0; i<nv; i++) {
616  nedges += this->degree_handler->get_degree(st + i).indegree;
617  if (nedges >= halfedges) {
618  splitpos = i+st-1;
619  found = true;
620  break;
621  }
622  }
623  if (found) break;
624  st = en+1;
625  }
626  assert(splitpos > this->intervals[shard].first && splitpos < this->intervals[shard].second);
627  }
628 
629  for(int splits=0; splits<outparts; splits++) { // Note: this is not super-efficient because we do the operation twice in case of split
630  typename base_engine::slidingshard_t * curshard =
631  new typename base_engine::slidingshard_t(this->iomgr, origshardfile, origadjfile,
632  this->intervals[shard].first, this->intervals[shard].second,
633  1024 * 1024, this->m, true);
634 
635 
636  std::string suffix = "";
637  char partstr[128];
638  sprintf(partstr, "%d", shard);
639  if (splits == 0) {
640  suffix = std::string(partstr);
641  } else {
642  suffix = std::string(partstr) + ".split";
643  }
644  suffix = suffix + ".i" + std::string(iterstr);
645  newsuffices.push_back(suffix);
646  std::string outfile_edata = filename_shard_edata<EdgeDataType>(this->base_filename, 0, 0) + ".dyngraph" + suffix;
647  std::string outfile_adj = filename_shard_adj(this->base_filename, 0, 0) + ".dyngraph" + suffix;
648 
649  vid_t splitstart = this->intervals[shard].first;
650  vid_t splitend = this->intervals[shard].second;
651  if (shard == this->nshards - 1) splitend = max_vertex_id;
652 
653  // This is looking more and more hacky
654  if (outparts == 2) {
655  if (splits==0) splitend = splitpos;
656  else splitstart = splitpos+1;
657  }
658  newranges.push_back(std::pair<vid_t,vid_t>(splitstart, splitend));
659 
660  // Create the adj file
661  int f = open(outfile_adj.c_str(), O_WRONLY | O_CREAT, S_IROTH | S_IWOTH | S_IWUSR | S_IRUSR);
662  ftruncate(f, 0);
663  /* Create edge data file */
664  int ef = open(outfile_edata.c_str(), O_WRONLY | O_CREAT, S_IROTH | S_IWOTH | S_IWUSR | S_IRUSR);
665  ftruncate(ef, 0);
666  char * buf = (char*) malloc(BBUF);
667  char * bufptr = buf;
668  char * ebuf = (char*) malloc(BBUF);
669  char * ebufptr = ebuf;
670 
671  // Now create a new shard file window by window
672  for(int window=0; window < this->nshards; window++) {
673  vid_t range_st = this->intervals[window].first;
674  vid_t range_en = this->intervals[window].second;
675  if (window == this->nshards - 1) range_en = max_vertex_id;
676  edge_buffer &buffer_for_window = *new_edge_buffers[shard][window];
677 
678  for(vid_t window_st=range_st; window_st<range_en; ) {
679  // Check how much we can read
680  vid_t window_en = determine_next_window(window, window_st,
681  std::min(range_en, window_st + (vid_t)maxwindow), mem_budget);
682  // Create vertices
683  int nvertices = window_en-window_st+1;
684  std::vector< svertex_t > vertices(nvertices, svertex_t());
685  /* Allocate edge data: to do this, need to compute sum of in & out edges */
686  graphchi_edge<EdgeDataType> * edata = NULL;
687  size_t num_edges=0;
688  for(int i=0; i<nvertices; i++) {
689  degree d = this->degree_handler->get_degree(i + window_st);
690  num_edges += d.indegree+d.outdegree;
691  }
692  size_t ecounter = 0;
693  edata = (graphchi_edge<EdgeDataType>*)malloc(num_edges * sizeof(graphchi_edge<EdgeDataType>));
694  for(int i=0; i<(int)nvertices; i++) {
695  // int inc = degrees[i].indegree;
696  degree d = this->degree_handler->get_degree(i + window_st);
697  int outc = d.outdegree;
698  vertices[i] = svertex_t(window_st+i, &edata[ecounter],
699  &edata[ecounter+0], 0, outc);
700  vertices[i].scheduled = true; // guarantee that shard will read it
701  ecounter += 0 + outc;
702  }
703 
704  // Read vertices in
705  curshard->read_next_vertices(nvertices, window_st, vertices, false, true);
706 
707  // Incorporate buffered edges
708  for(unsigned int ebi=0; ebi<buffer_for_window.size(); ebi++) {
709  created_edge<EdgeDataType> * edge = buffer_for_window[ebi];
710  if (edge->src >= window_st && edge->src <= window_en) {
711  vertices[edge->src-window_st].add_outedge(edge->dst, &edge->data, false);
712 
713  }
714  }
715  this->iomgr->wait_for_reads();
716 
717  // If we are splitting, need to adjust counts
718  std::vector<int> adjusted_counts(vertices.size(), 0);
719  for(int iv=0; iv< (int)vertices.size(); iv++) adjusted_counts[iv] = vertices[iv].outc;
720 
721  if (outparts == 2) {
722  // do actual counts by removing the edges not in this split
723  for(int iv=0; iv< (int)vertices.size(); iv++) {
724  svertex_t &vertex = vertices[iv];
725  for(int i=0; i<vertex.outc; i++) {
726  if (!(vertex.outedge(i)->vertexid >= splitstart && vertex.outedge(i)->vertexid <= splitend)) {
727  adjusted_counts[iv]--;
728  }
729  }
730  }
731  }
732 
733 #ifdef SUPPORT_DELETIONS
734  // Adjust counts to remove deleted edges
735  for(int iv=0; iv< (int)vertices.size(); iv++) {
736  svertex_t &vertex = vertices[iv];
737  for(int i=0; i<vertex.outc; i++) {
738  if (is_deleted_edge_value(vertex.outedge(i)->get_data())) {
739  adjusted_counts[iv]--;
740  assert(false);
741  }
742  }
743  }
744 
745  // Adjust degrees
746  // adjust_degrees_for_deleted(vertices, window_st); // Double counting problem, that is why commented out.
747 #endif
748 
749  size_t ne = 0;
750  for(vid_t curvid=window_st; curvid<=window_en;) {
751  int iv = curvid - window_st;
752  svertex_t &vertex = vertices[iv];
753  int count = adjusted_counts[iv];
754  if (count == 0) {
755  // Check how many next ones are zeros
756  int nz=0;
757  curvid++;
758  for(; curvid <= window_en && nz<254; curvid++) {
759  if (adjusted_counts[curvid - window_st] == 0) {
760  nz++;
761  } else {
762  break;
763  }
764  }
765  uint8_t nnz = (uint8_t)nz;
766  // Write zero
767  bwrite<uint8_t>(f, buf, bufptr, 0);
768  bwrite<uint8_t>(f, buf, bufptr, nnz);
769  } else {
770  if (count < 255) {
771  uint8_t x = (uint8_t)count;
772  bwrite<uint8_t>(f, buf, bufptr, x);
773  } else {
774  bwrite<uint8_t>(f, buf, bufptr, 0xff);
775  bwrite<uint32_t>(f, buf, bufptr, (uint32_t)count);
776  }
777 
778  for(int i=0; i<vertex.outc; i++) {
779  if (vertex.outedge(i)->vertexid >= splitstart && vertex.outedge(i)->vertexid <= splitend) {
780 #ifdef SUPPORT_DELETIONS
781  if (is_deleted_edge_value(vertex.outedge(i)->get_data())) {
782  assert(false);
783 
784  }
785 #endif
786  bwrite(f, buf, bufptr, vertex.outedge(i)->vertexid);
787  bwrite<EdgeDataType>(ef, ebuf, ebufptr, vertex.outedge(i)->get_data());
788  ne++;
789  } else assert(outparts == 2);
790  }
791  curvid++;
792  }
793  }
794  free(edata);
795  window_st = window_en+1;
796  }
797 
798  } // end window
799 
800  // Flush buffers
801  writea(f, buf, bufptr-buf);
802  writea(ef, ebuf, ebufptr-ebuf);
803 
804  // Release
805  free(buf);
806  free(ebuf);
807 
808 
809  delete curshard;
810  close(f);
811  close(ef);
812 
813  this->iomgr->wait_for_writes();
814  } // splits
815 
816  // Delete old shard
817  std::string old_file_adj = filename_shard_adj(this->base_filename, 0, 0) + ".dyngraph" + shard_suffices[shard];
818  std::string old_file_edata = filename_shard_edata<EdgeDataType>(this->base_filename, 0, 0) + ".dyngraph" + shard_suffices[shard];
819  remove(old_file_adj.c_str());
820  remove(old_file_edata.c_str());
821  }
822 
823  // Clear buffers
824  for(int shard=0; shard < this->nshards; shard++) {
825  if (was_commited[shard]) {
826  for (int win=0; win < this->nshards; win++) {
827  edge_buffer &buffer_for_window = *new_edge_buffers[shard][win];
828  for(unsigned int ebi=0; ebi<buffer_for_window.size(); ebi++) {
829  created_edge<EdgeDataType> * edge = buffer_for_window[ebi];
830  if (!edge->accounted_for_outc) {
831  std::cout << "Edge not accounted (out)! " << edge->src << " -- " << edge->dst << std::endl;
832  }
833  if (!edge->accounted_for_inc) {
834  std::cout << "Edge not accounted (in)! " << edge->src << " -- " << edge->dst << std::endl;
835  }
836 
837  assert(edge->accounted_for_inc);
838  assert(edge->accounted_for_outc);
839  }
840  buffer_for_window.clear();
841  }
842  }
843  }
844 
845  // Update number of shards:
846  last_commit = added_edges;
847  this->intervals = newranges;
848  shard_suffices = newsuffices;
849  this->nshards = (int) this->intervals.size();
850 
851  /* If the vertex intervals change, need to recreate the shard objects. */
852  if (rangeschanged) {
853  shardlock.lock();
854  for (int i=0; i<(int)this->sliding_shards.size(); i++) {
855  if (this->sliding_shards[i] != NULL) delete this->sliding_shards[i];
856  }
857  this->sliding_shards.clear();
858  shardlock.unlock();
859  }
860  init_buffers();
861  modification_lock.unlock();
862  }
863  template <typename T>
864  void bwrite(int f, char * buf, char * &bufptr, T val) {
865  if (bufptr+sizeof(T)-buf>=BBUF) {
866  writea(f, buf, bufptr-buf);
867  bufptr = buf;
868  }
869  *((T*)bufptr) = val;
870  bufptr += sizeof(T);
871  }
872 
876  public:
877  std::string get_info_json() {
878  std::stringstream json;
879 
880  this->httplock.lock();
881 
885  json << "{";
886  json << "\"state\" : \"" << state << "\",\n";
887  json << "\"file\" : \"" << this->base_filename << "\",\n";
888  json << "\"numOfShards\": " << this->nshards << ",\n";
889  json << "\"iteration\": " << this->chicontext.iteration << ",\n";
890  json << "\"numIterations\": " << this->chicontext.num_iterations << ",\n";
891  json << "\"runTime\": " << this->chicontext.runtime() << ",\n";
892 
893  json << "\"updates\": " << this->nupdates << ",\n";
894  json << "\"nvertices\": " << this->chicontext.nvertices << ",\n";
895  json << "\"edges\": " << edges_in_shards << ",\n";
896 
897  json << "\"edgesInBuffers\": " << added_edges << ",\n";
898 
899  json << "\"interval\":" << this->exec_interval << ",\n";
900  json << "\"windowStart\":" << this->sub_interval_st << ",";
901  json << "\"windowEnd\": " << this->sub_interval_en << ",";
902  json << "\"shards\": [";
903 
904 
905  shardlock.lock();
906  for(int p=0; p < (int) this->sliding_shards.size(); p++) {
907  if (p>0) json << ",";
908 
909  typename base_engine::slidingshard_t * shard = this->sliding_shards[p];
910  if (shard != NULL) {
911  json << "{";
912  json << "\"p\": " << p << ", ";
913  json << shard->get_info_json();
914  json << "}";
915  } else {
916  json << "{";
917  json << "\"p\": " << p << ", ";
918  json << "\"state\": \"recreated\"";
919  json << "}";
920  }
921  }
922  shardlock.unlock();
923 
924  json << "]";
925 
926  std::map<std::string, std::string>::iterator it;
927  for(it=this->json_params.begin(); it != this->json_params.end(); ++it) {
928  json << ", \"" << it->first << "\":\"";
929  json << it->second << "\"";
930  }
931 
932  json << "}";
933 
934  this->httplock.unlock();
935  return json.str();
936  }
937 
938 
939 
940  }; // End class
941 
942 }; // End namespace
943 
944 
945 #endif
946 
947