31 #ifndef GRAPHCHI_DYNAMICGRAPHENGINE_DEF
32 #define GRAPHCHI_DYNAMICGRAPHENGINE_DEF
48 template <
typename VertexDataType,
typename EdgeDataType,
typename svertex_t = graphchi_vertex<VertexDataType, EdgeDataType> >
56 _m.set(
"engine",
"dynamicgraphs");
58 maxshardsize = 200 * 1024 * 1024;
67 std::vector<int> deletecounts;
68 std::vector<std::string> shard_suffices;
71 size_t max_edge_buffer;
76 size_t edges_in_shards;
98 std::string orig_degree_file = filename_degree_data(this->base_filename);
99 std::string dynsuffix =
".dynamic";
100 std::string dynamic_degree_file = filename_degree_data(this->base_filename + dynsuffix);
101 cp(orig_degree_file, dynamic_degree_file);
102 return new degree_data(this->base_filename + dynsuffix, this->iomgr);
107 for(
int i=0; i < this->nshards; i++) {
108 ne += this->sliding_shards[i]->num_edges();
119 return added_edges + edges_in_shards;
122 size_t num_buffered_edges() {
127 void init_buffers() {
128 max_edge_buffer = get_option_long(
"max_edgebuffer_mb", 1000) * 1024 * 1024 /
sizeof(created_edge<EdgeDataType>);
131 std::vector< std::vector< edge_buffer * > > tmp_new_edge_buffers;
132 for(
int i=0; i < this->nshards; i++) {
133 std::vector<edge_buffer *> shardbuffers = std::vector<edge_buffer *>();
134 for(
int j=0; j < this->nshards; j++) {
135 shardbuffers.push_back(
new edge_buffer());
137 tmp_new_edge_buffers.push_back(shardbuffers);
143 for(
typename std::vector< std::vector< edge_buffer * > >::iterator oldit =
new_edge_buffers.begin();
145 for(
typename std::vector< edge_buffer *>::iterator bufit = oldit->begin(); bufit != oldit->end(); ++bufit) {
146 edge_buffer &buffer_for_window = **bufit;
147 for(
unsigned int ebi = 0; ebi < buffer_for_window.size(); ebi++ ) {
148 created_edge<EdgeDataType> *
edge = buffer_for_window[ebi];
149 int shard = get_shard_for(edge->dst);
150 int srcshard = get_shard_for(edge->src);
152 tmp_new_edge_buffers[shard][srcshard]->add(*edge);
158 std::cout <<
"TRANSFERRED " << i <<
" EDGES OVER." << std::endl;
168 size_t cp(std::string origfile, std::string dstfile,
bool zeroout=
false) {
170 int f = open(origfile.c_str(), O_RDONLY);
171 size_t len = readfull(f, &buf);
172 std::cout <<
"Length: " << len << std::endl;
173 std::cout << origfile <<
" ----> " << dstfile << std::endl;
176 remove(dstfile.c_str());
177 int of = open(dstfile.c_str(), O_WRONLY | O_CREAT, S_IROTH | S_IWOTH | S_IWUSR | S_IRUSR);
182 writea(of, buf, len);
184 assert(get_filesize(origfile) == get_filesize(dstfile));
191 virtual typename base_engine::memshard_t * create_memshard(vid_t interval_st, vid_t interval_en) {
192 int p = this->exec_interval;
193 std::string adj_filename = filename_shard_adj(this->base_filename, 0, 0) +
".dyngraph" + shard_suffices[p];
194 std::string edata_filename = filename_shard_edata<EdgeDataType>(this->base_filename, 0, 0) +
".dyngraph" + shard_suffices[p];
195 return new typename base_engine::memshard_t(this->iomgr,
209 if (this->sliding_shards.empty()) {
210 for(
int p=0; p < this->nshards; p++) {
211 std::string adj_filename = filename_shard_adj(this->base_filename, 0, 0) +
".dyngraph" + shard_suffices[p];
212 std::string edata_filename = filename_shard_edata<EdgeDataType>(this->base_filename, 0, 0) +
".dyngraph" + shard_suffices[p];
214 this->sliding_shards.push_back(
217 this->intervals[p].first,
218 this->intervals[p].second,
221 !this->modifies_outedges,
225 for(
int p=0; p < this->nshards; p++) {
226 if (this->sliding_shards[p] == NULL) {
227 std::string adj_filename = filename_shard_adj(this->base_filename, 0, 0) +
".dyngraph" + shard_suffices[p];
228 std::string edata_filename = filename_shard_edata<EdgeDataType>(this->base_filename, 0, 0) +
".dyngraph" + shard_suffices[p];
232 this->intervals[p].first,
233 this->intervals[p].second,
236 !this->modifies_outedges,
246 void prepare_clean_slate() {
247 logstream(
LOG_INFO) <<
"Preparing clean slate..." << std::endl;
248 for(
int shard=0; shard < this->nshards; shard++) {
249 shard_suffices.push_back(get_part_str(shard, this->nshards));
251 std::string edata_filename = filename_shard_edata<EdgeDataType>(this->base_filename, shard, this->nshards);
252 std::string adj_filename = filename_shard_adj(this->base_filename, shard, this->nshards);
253 std::string dest_adj = filename_shard_adj(this->base_filename, 0, 0) +
".dyngraph" + shard_suffices[shard];
254 std::string dest_edata = filename_shard_edata<EdgeDataType>(this->base_filename, 0, 0) +
".dyngraph" + shard_suffices[shard];
256 cp(edata_filename, dest_edata,
true);
257 cp(adj_filename, dest_adj);
261 int get_shard_for(vid_t dst) {
262 for(
int i=0; i < this->nshards; i++) {
263 if (dst >= this->intervals[i].first && dst <= this->intervals[i].second) {
267 return this->nshards - 1;
271 bool add_edge(vid_t src, vid_t dst, EdgeDataType edata) {
273 logstream(
LOG_WARNING) <<
"WARNING : tried to add self-edge!" << std::endl;
276 if (this->iter < 1) {
277 logstream(
LOG_WARNING) <<
"Tried to add edge before first iteration has passed" << std::endl;
281 if (added_edges - last_commit > 1.2 * max_edge_buffer) {
282 logstream(
LOG_INFO) <<
"Over 20% of max buffer... hold on...." << std::endl;
288 int shard = get_shard_for(dst);
289 int srcshard = get_shard_for(src);
291 vid_t prev_max_id = max_vertex_id;
292 max_vertex_id = std::max(max_vertex_id, dst);
293 max_vertex_id = std::max(max_vertex_id, src);
296 if (max_vertex_id>prev_max_id) {
297 this->degree_handler->ensure_size(this->max_vertex_id);
300 if (this->scheduler != NULL) {
301 schedulerlock.lock();
302 this->scheduler->resize(1 + max_vertex_id);
303 schedulerlock.unlock();
313 void add_task(vid_t vid) {
314 if (this->scheduler != NULL) {
316 this->scheduler->add_task(vid);
322 void incorporate_buffered_edges(
int window, vid_t window_st, vid_t window_en, std::vector<svertex_t> & vertices) {
326 for(
int shard=0; shard<this->nshards; shard++) {
328 for(
unsigned int ebi=0; ebi<buffer_for_window.size(); ebi++) {
329 created_edge<EdgeDataType> *
edge = buffer_for_window[ebi];
330 if (edge->src >= window_st && edge->src <= window_en) {
331 if (vertices[edge->src-window_st].scheduled) {
332 if (vertices[edge->src-window_st].scheduled)
333 vertices[edge->src-window_st].add_outedge(edge->dst, &edge->data,
false);
341 for(
int w=0; w<this->nshards; w++) {
343 for(
unsigned int ebi=0; ebi<buffer_for_window.size(); ebi++) {
344 created_edge<EdgeDataType> * edge = buffer_for_window[ebi];
345 if (edge->dst >= window_st && edge->dst <= window_en) {
346 if (vertices[edge->dst - window_st].scheduled) {
347 assert(edge->data < 1e20);
348 if (vertices[edge->dst-window_st].scheduled)
349 vertices[edge->dst - window_st].add_inedge(edge->src, &edge->data,
false);
355 logstream(
LOG_INFO) <<
"::: Used " << ncreated <<
" buffered edges." << std::endl;
358 bool incorporate_new_edge_degrees(
int window, vid_t window_st, vid_t window_en) {
359 bool modified =
false;
361 for(
int shard=0; shard < this->nshards; shard++) {
363 for(
unsigned int ebi=0; ebi<buffer_for_window.size(); ebi++) {
364 created_edge<EdgeDataType> * edge = buffer_for_window[ebi];
365 if (edge->src >= window_st && edge->src <= window_en) {
366 if (!edge->accounted_for_outc) {
367 degree d = this->degree_handler->get_degree(edge->src);
369 this->degree_handler->set_degree(edge->src, d);
372 edge->accounted_for_outc =
true;
379 for(
int w=0; w < this->nshards; w++) {
381 for(
unsigned int ebi=0; ebi<buffer_for_window.size(); ebi++) {
382 created_edge<EdgeDataType> * edge = buffer_for_window[ebi];
383 if (edge->dst >= window_st && edge->dst <= window_en) {
384 if (!edge->accounted_for_inc) {
385 degree d = this->degree_handler->get_degree(edge->dst);
387 this->degree_handler->set_degree(edge->dst, d);
388 edge->accounted_for_inc =
true;
397 void adjust_degrees_for_deleted(std::vector< svertex_t > &vertices, vid_t window_st) {
398 #ifdef SUPPORT_DELETIONS
400 bool somechanged =
false;
401 for(
int i=0; i < (int)vertices.size(); i++) {
402 svertex_t &v = vertices[i];
404 this->degree_handler->set_degree(v.id(), v.inc, v.outc);
405 somechanged = somechanged || (v.deleted_inc + v.deleted_outc > 0);
406 degree deg = this->degree_handler->get_degree(v.id());
408 if (!(deg.indegree >=0 && deg.outdegree >= 0)) {
409 std::cout <<
"Degree discrepancy: " << deg.indegree <<
" " << deg.outdegree << std::endl;
411 assert(deg.indegree >=0 && deg.outdegree >= 0);
415 this->degree_handler->save();
422 this->degree_handler->
load(fromvid, maxvid);
423 if (incorporate_new_edge_degrees(iinterval, fromvid, maxvid)) {
424 this->degree_handler->save();
428 int max_interval = maxvid - fromvid;
429 for(
int i=0; i < max_interval; i++) {
430 degree deg = this->degree_handler->get_degree(fromvid + i);
431 int inc = deg.indegree;
432 int outc = deg.outdegree;
435 memreq +=
sizeof(svertex_t) + (
sizeof(EdgeDataType) +
sizeof(vid_t) +
437 if (memreq > membudget) {
438 return fromvid + i - 1;
445 virtual void load_before_updates(std::vector<svertex_t> &vertices) {
446 this->base_engine::load_before_updates(vertices);
448 #ifdef SUPPORT_DELETIONS
449 for(
unsigned int i=0; i < (
unsigned int)vertices.size(); i++) {
450 deletecounts[this->exec_interval] += vertices[i].deleted_inc;
456 virtual void init_vertices(std::vector<svertex_t> &vertices,
457 graphchi_edge<EdgeDataType> * &edata) {
459 base_engine::init_vertices(vertices, edata);
460 incorporate_buffered_edges(this->exec_interval, this->sub_interval_st, this->sub_interval_en, vertices);
465 virtual void initialize_iter() {
466 this->intervals[this->nshards - 1].second = max_vertex_id;
467 this->vertex_data_handler->check_size(max_vertex_id + 1);
471 deletecounts.clear();
472 for(
int p=0; p < this->nshards; p++)
473 deletecounts.push_back(0);
476 virtual void iteration_finished() {
477 if (this->iter < this->niters - 1) {
479 for(
int p=0; p < this->nshards; p++) {
480 this->sliding_shards[p]->flush();
481 this->sliding_shards[p]->set_offset(0, 0, 0);
484 this->iomgr->wait_for_writes();
490 virtual void initialize_before_run() {
491 prepare_clean_slate();
494 max_vertex_id = (vid_t) (this->num_vertices() - 1);
496 this->vertex_data_handler->clear(this->num_vertices());
501 virtual void load_after_updates(std::vector<svertex_t> &vertices) {
502 this->base_engine::load_after_updates(vertices);
503 adjust_degrees_for_deleted(vertices, this->sub_interval_st);
507 void finish_after_iters(
int extra_iters) {
508 this->chicontext.last_iteration = this->chicontext.iteration + extra_iters;
514 #define BBUF 32000000
522 for(
size_t i=0; i < deletecounts.size(); i++) {
523 ndeleted += deletecounts[i];
528 logstream(
LOG_DEBUG) <<
"Total deleted: " << ndeleted <<
" total edges: " << this->
num_edges() << std::endl;
530 if (added_edges - last_commit < max_edge_buffer * 0.8 && ndeleted < this->
num_edges() * 0.1) {
531 std::cout <<
"==============================" << std::endl;
532 std::cout <<
"No time to commit yet.... Only " << (added_edges - last_commit) <<
" / " << max_edge_buffer
533 <<
" in buffers" << std::endl;
538 bool rangeschanged =
false;
539 state =
"commit-ingests";
540 vid_t maxwindow = 4000000;
541 size_t mem_budget = this->membudget_mb * 1024 * 1024;
547 std::vector<int> edgespershard;
548 for(
int p=0; p < this->nshards; p++) {
549 edgespershard.push_back(this->sliding_shards[p]->
num_edges());
552 std::vector<std::pair<vid_t, vid_t> > newranges;
553 std::vector<std::string> newsuffices;
556 sprintf(iterstr,
"%d", this->iter);
558 size_t min_buffer_in_shard_to_commit = max_edge_buffer / this->nshards / 2;
560 std::vector<bool> was_commited(this->nshards,
true);
562 for(
int shard=0; shard < this->nshards; shard++) {
567 for(
int w=0; w < this->nshards; w++) {
568 bufedges += shard_buffer[w]->size();
572 if (bufedges < min_buffer_in_shard_to_commit && deletecounts[shard] * 1.0 / edgespershard[shard] < 0.2) {
573 logstream(
LOG_DEBUG) << shard <<
": not enough edges for shard: " << bufedges <<
" deleted:" << deletecounts[shard] <<
"/" << edgespershard[shard] << std::endl;
574 newranges.push_back(this->intervals[shard]);
575 newsuffices.push_back(shard_suffices[shard]);
576 was_commited[shard] =
false;
579 logstream(
LOG_DEBUG) << shard <<
": going to rewrite, deleted:" << deletecounts[shard] <<
"/" << edgespershard[shard] <<
" bufedges: " << bufedges << std::endl;
581 delete this->sliding_shards[shard];
582 this->sliding_shards[shard] = NULL;
585 std::string origshardfile = filename_shard_edata<EdgeDataType>(this->base_filename, 0, 0) +
".dyngraph" + shard_suffices[shard];
586 std::string origadjfile = filename_shard_adj(this->base_filename, 0, 0) +
".dyngraph" + shard_suffices[shard];
589 int of = open(origshardfile.c_str(), O_RDONLY);
590 off_t sz = lseek(of, 0, SEEK_END);
591 lseek(of, 0, SEEK_SET);
594 int outparts = ( sz >= (off_t) maxshardsize ? 2 : 1);
597 if (sz > (off_t)maxshardsize) {
598 rangeschanged =
true;
600 size_t halfedges = (sz /
sizeof(EdgeDataType)) / 2;
602 for(
int w=0; w < this->nshards; w++) {
607 vid_t st = this->intervals[shard].first;
608 splitpos = st + (this->intervals[shard].second - st) / 2;
610 while(st < this->intervals[shard].second) {
611 vid_t en = std::min(st + maxwindow, this->intervals[shard].second);
612 this->degree_handler->
load(st, en);
613 int nv = en - st + 1;
615 for(
int i=0; i<nv; i++) {
616 nedges += this->degree_handler->get_degree(st + i).indegree;
617 if (nedges >= halfedges) {
626 assert(splitpos > this->intervals[shard].first && splitpos < this->intervals[shard].second);
629 for(
int splits=0; splits<outparts; splits++) {
632 this->intervals[shard].first, this->intervals[shard].second,
633 1024 * 1024, this->m,
true);
636 std::string suffix =
"";
638 sprintf(partstr,
"%d", shard);
640 suffix = std::string(partstr);
642 suffix = std::string(partstr) +
".split";
644 suffix = suffix +
".i" + std::string(iterstr);
645 newsuffices.push_back(suffix);
646 std::string outfile_edata = filename_shard_edata<EdgeDataType>(this->base_filename, 0, 0) +
".dyngraph" + suffix;
647 std::string outfile_adj = filename_shard_adj(this->base_filename, 0, 0) +
".dyngraph" + suffix;
649 vid_t splitstart = this->intervals[shard].first;
650 vid_t splitend = this->intervals[shard].second;
651 if (shard == this->nshards - 1) splitend = max_vertex_id;
655 if (splits==0) splitend = splitpos;
656 else splitstart = splitpos+1;
658 newranges.push_back(std::pair<vid_t,vid_t>(splitstart, splitend));
661 int f = open(outfile_adj.c_str(), O_WRONLY | O_CREAT, S_IROTH | S_IWOTH | S_IWUSR | S_IRUSR);
664 int ef = open(outfile_edata.c_str(), O_WRONLY | O_CREAT, S_IROTH | S_IWOTH | S_IWUSR | S_IRUSR);
666 char * buf = (
char*) malloc(BBUF);
668 char * ebuf = (
char*) malloc(BBUF);
669 char * ebufptr = ebuf;
672 for(
int window=0; window < this->nshards; window++) {
673 vid_t range_st = this->intervals[window].first;
674 vid_t range_en = this->intervals[window].second;
675 if (window == this->nshards - 1) range_en = max_vertex_id;
678 for(vid_t window_st=range_st; window_st<range_en; ) {
681 std::min(range_en, window_st + (vid_t)maxwindow), mem_budget);
683 int nvertices = window_en-window_st+1;
684 std::vector< svertex_t > vertices(nvertices, svertex_t());
688 for(
int i=0; i<nvertices; i++) {
689 degree d = this->degree_handler->get_degree(i + window_st);
690 num_edges += d.indegree+d.outdegree;
694 for(
int i=0; i<(int)nvertices; i++) {
696 degree d = this->degree_handler->get_degree(i + window_st);
697 int outc = d.outdegree;
698 vertices[i] = svertex_t(window_st+i, &edata[ecounter],
699 &edata[ecounter+0], 0, outc);
700 vertices[i].scheduled =
true;
701 ecounter += 0 + outc;
708 for(
unsigned int ebi=0; ebi<buffer_for_window.size(); ebi++) {
710 if (edge->src >= window_st && edge->src <= window_en) {
711 vertices[edge->src-window_st].add_outedge(edge->dst, &edge->data,
false);
715 this->iomgr->wait_for_reads();
718 std::vector<int> adjusted_counts(vertices.size(), 0);
719 for(
int iv=0; iv< (int)vertices.size(); iv++) adjusted_counts[iv] = vertices[iv].outc;
723 for(
int iv=0; iv< (int)vertices.size(); iv++) {
724 svertex_t &vertex = vertices[iv];
725 for(
int i=0; i<vertex.outc; i++) {
726 if (!(vertex.outedge(i)->vertexid >= splitstart && vertex.outedge(i)->vertexid <= splitend)) {
727 adjusted_counts[iv]--;
733 #ifdef SUPPORT_DELETIONS
735 for(
int iv=0; iv< (int)vertices.size(); iv++) {
736 svertex_t &vertex = vertices[iv];
737 for(
int i=0; i<vertex.outc; i++) {
738 if (is_deleted_edge_value(vertex.outedge(i)->get_data())) {
739 adjusted_counts[iv]--;
750 for(vid_t curvid=window_st; curvid<=window_en;) {
751 int iv = curvid - window_st;
752 svertex_t &vertex = vertices[iv];
753 int count = adjusted_counts[iv];
758 for(; curvid <= window_en && nz<254; curvid++) {
759 if (adjusted_counts[curvid - window_st] == 0) {
765 uint8_t nnz = (uint8_t)nz;
767 bwrite<uint8_t>(f, buf, bufptr, 0);
768 bwrite<uint8_t>(f, buf, bufptr, nnz);
771 uint8_t x = (uint8_t)count;
772 bwrite<uint8_t>(f, buf, bufptr, x);
774 bwrite<uint8_t>(f, buf, bufptr, 0xff);
775 bwrite<uint32_t>(f, buf, bufptr, (uint32_t)count);
778 for(
int i=0; i<vertex.outc; i++) {
779 if (vertex.outedge(i)->vertexid >= splitstart && vertex.outedge(i)->vertexid <= splitend) {
780 #ifdef SUPPORT_DELETIONS
781 if (is_deleted_edge_value(vertex.outedge(i)->get_data())) {
786 bwrite(f, buf, bufptr, vertex.outedge(i)->vertexid);
787 bwrite<EdgeDataType>(ef, ebuf, ebufptr, vertex.outedge(i)->get_data());
789 }
else assert(outparts == 2);
795 window_st = window_en+1;
801 writea(f, buf, bufptr-buf);
802 writea(ef, ebuf, ebufptr-ebuf);
813 this->iomgr->wait_for_writes();
817 std::string old_file_adj = filename_shard_adj(this->base_filename, 0, 0) +
".dyngraph" + shard_suffices[shard];
818 std::string old_file_edata = filename_shard_edata<EdgeDataType>(this->base_filename, 0, 0) +
".dyngraph" + shard_suffices[shard];
819 remove(old_file_adj.c_str());
820 remove(old_file_edata.c_str());
824 for(
int shard=0; shard < this->nshards; shard++) {
825 if (was_commited[shard]) {
826 for (
int win=0; win < this->nshards; win++) {
828 for(
unsigned int ebi=0; ebi<buffer_for_window.size(); ebi++) {
830 if (!edge->accounted_for_outc) {
831 std::cout <<
"Edge not accounted (out)! " << edge->src <<
" -- " << edge->dst << std::endl;
833 if (!edge->accounted_for_inc) {
834 std::cout <<
"Edge not accounted (in)! " << edge->src <<
" -- " << edge->dst << std::endl;
837 assert(edge->accounted_for_inc);
838 assert(edge->accounted_for_outc);
840 buffer_for_window.clear();
846 last_commit = added_edges;
847 this->intervals = newranges;
848 shard_suffices = newsuffices;
849 this->nshards = (int) this->intervals.size();
854 for (
int i=0; i<(int)this->sliding_shards.size(); i++) {
855 if (this->sliding_shards[i] != NULL)
delete this->sliding_shards[i];
857 this->sliding_shards.clear();
863 template <
typename T>
864 void bwrite(
int f,
char * buf,
char * &bufptr, T val) {
865 if (bufptr+
sizeof(T)-buf>=BBUF) {
866 writea(f, buf, bufptr-buf);
878 std::stringstream json;
880 this->httplock.lock();
886 json <<
"\"state\" : \"" << state <<
"\",\n";
887 json <<
"\"file\" : \"" << this->base_filename <<
"\",\n";
888 json <<
"\"numOfShards\": " << this->nshards <<
",\n";
889 json <<
"\"iteration\": " << this->chicontext.iteration <<
",\n";
890 json <<
"\"numIterations\": " << this->chicontext.num_iterations <<
",\n";
891 json <<
"\"runTime\": " << this->chicontext.runtime() <<
",\n";
893 json <<
"\"updates\": " << this->nupdates <<
",\n";
894 json <<
"\"nvertices\": " << this->chicontext.nvertices <<
",\n";
895 json <<
"\"edges\": " << edges_in_shards <<
",\n";
897 json <<
"\"edgesInBuffers\": " << added_edges <<
",\n";
899 json <<
"\"interval\":" << this->exec_interval <<
",\n";
900 json <<
"\"windowStart\":" << this->sub_interval_st <<
",";
901 json <<
"\"windowEnd\": " << this->sub_interval_en <<
",";
902 json <<
"\"shards\": [";
906 for(
int p=0; p < (int) this->sliding_shards.size(); p++) {
907 if (p>0) json <<
",";
912 json <<
"\"p\": " << p <<
", ";
913 json << shard->get_info_json();
917 json <<
"\"p\": " << p <<
", ";
918 json <<
"\"state\": \"recreated\"";
926 std::map<std::string, std::string>::iterator it;
927 for(it=this->json_params.begin(); it != this->json_params.end(); ++it) {
928 json <<
", \"" << it->first <<
"\":\"";
929 json << it->second <<
"\"";
934 this->httplock.unlock();