29 #ifndef DEF_GRAPHCHI_GRAPHCHI_ENGINE
30 #define DEF_GRAPHCHI_GRAPHCHI_ENGINE
62 typename svertex_t = graphchi_vertex<VertexDataType, EdgeDataType> >
70 std::string base_filename;
77 std::vector<slidingshard_t *> sliding_shards;
79 std::vector<std::pair<vid_t, vid_t> > intervals;
92 bool modifies_outedges;
93 bool modifies_inedges;
95 bool use_selective_scheduling;
96 bool enable_deterministic_parallelism;
105 vid_t sub_interval_st;
106 vid_t sub_interval_en;
117 void print_config() {
118 logstream(
LOG_INFO) <<
"Engine configuration: " << std::endl;
119 logstream(
LOG_INFO) <<
" exec_threads = " << exec_threads << std::endl;
120 logstream(
LOG_INFO) <<
" load_threads = " << load_threads << std::endl;
121 logstream(
LOG_INFO) <<
" membudget_mb = " << membudget_mb << std::endl;
122 logstream(
LOG_INFO) <<
" blocksize = " << blocksize << std::endl;
123 logstream(
LOG_INFO) <<
" scheduler = " << use_selective_scheduling << std::endl;
134 graphchi_engine(std::string _base_filename,
int _nshards,
bool _selective_scheduling,
metrics &_m) : base_filename(_base_filename), nshards(_nshards), use_selective_scheduling(_selective_scheduling), m(_m) {
137 if (disable_preloading()) {
138 iomgr->set_disable_preloading(
true);
140 logstream(
LOG_INFO) <<
"Initializing graphchi_engine. This engine expects " <<
sizeof(EdgeDataType)
141 <<
"-byte edge data. " << std::endl;
145 nshards = get_option_int(
"nshards", 0);
147 logstream(
LOG_WARNING) <<
"Number of shards was not specified (command-line argument 'nshards'). Trying to detect. " << std::endl;
154 modifies_outedges =
true;
155 modifies_inedges =
true;
156 only_adjacency =
false;
157 blocksize = get_option_long(
"blocksize", 1024 * 1024);
158 membudget_mb = get_option_int(
"membudget_mb", 1024);
164 store_inedges =
true;
165 enable_deterministic_parallelism =
true;
166 load_threads = get_option_int(
"loadthreads", 2);
167 exec_threads = get_option_int(
"execthreads", omp_get_max_threads());
172 _m.set(
"engine",
"default");
176 if (degree_handler != NULL)
delete degree_handler;
177 if (vertex_data_handler != NULL)
delete vertex_data_handler;
178 if (memoryshard != NULL) {
182 for(
int i=0; i < (int)sliding_shards.size(); i++) {
183 if (sliding_shards[i] != NULL) {
184 delete sliding_shards[i];
186 sliding_shards[i] = NULL;
188 degree_handler = NULL;
189 vertex_data_handler = NULL;
196 virtual degree_data * create_degree_handler() {
197 return new degree_data(base_filename, iomgr);
200 virtual bool disable_preloading() {
209 int _nshards = find_shards<EdgeDataType>(base_filename);
211 logstream(
LOG_ERROR) <<
"Could not find suitable shards - maybe you need to run sharder to create them?" << std::endl;
212 logstream(
LOG_ERROR) <<
"You need to create the shards with edge data-type of size " <<
sizeof(EdgeDataType) <<
" bytes." << std::endl;
213 logstream(
LOG_ERROR) <<
"To specify the number of shards, use command-line parameter 'nshards'" << std::endl;
220 virtual void initialize_sliding_shards() {
221 assert(sliding_shards.size() == 0);
222 for(
int p=0; p < nshards; p++) {
223 std::string edata_filename = filename_shard_edata<EdgeDataType>(base_filename, p, nshards);
224 std::string adj_filename = filename_shard_adj(base_filename, p, nshards);
232 sliding_shards.push_back(
233 new slidingshard_t(iomgr, edata_filename,
242 nedges += sliding_shards[sliding_shards.size() - 1]->num_edges();
247 virtual void initialize_scheduler() {
248 if (use_selective_scheduling) {
249 scheduler =
new bitset_scheduler((
int) num_vertices());
250 scheduler->add_task_to_all();
263 degree_handler->
load(fromvid, maxvid);
266 int max_interval = maxvid - fromvid;
267 for(
int i=0; i < max_interval; i++) {
268 degree deg = degree_handler->get_degree(fromvid + i);
269 int inc = deg.indegree;
270 int outc = deg.outdegree;
274 if (memreq > membudget) {
275 return fromvid + i - 1;
287 int nvertices = en - st + 1;
288 if (scheduler != NULL) {
289 for(
int i=0; i < nvertices; i++) {
290 bool is_sched = scheduler->is_scheduled(st + i);
292 degree d = degree_handler->get_degree(st + i);
293 num_edges += d.indegree * store_inedges + d.outdegree;
297 for(
int i=0; i < nvertices; i++) {
298 degree d = degree_handler->get_degree(st + i);
299 num_edges += d.indegree * store_inedges + d.outdegree;
305 virtual void load_before_updates(std::vector<svertex_t> &vertices) {
306 omp_set_num_threads(load_threads);
307 #pragma omp parallel for schedule(dynamic, 1)
308 for(
int p=-1; p < nshards; p++) {
311 if (!memoryshard->loaded()) {
316 memoryshard->load_vertices(sub_interval_st, sub_interval_en, vertices);
319 vertex_data_handler->load(sub_interval_st, sub_interval_en);
322 if (p != exec_interval) {
323 sliding_shards[p]->read_next_vertices((
int) vertices.size(), sub_interval_st, vertices,
324 scheduler != NULL && chicontext.iteration == 0);
331 iomgr->wait_for_reads();
334 void exec_updates(GraphChiProgram<VertexDataType, EdgeDataType, svertex_t> &userprogram,
335 std::vector<svertex_t> &vertices) {
336 metrics_entry me = m.start_time();
337 size_t nvertices = vertices.size();
338 if (!enable_deterministic_parallelism) {
339 for(
int i=0; i < (int)nvertices; i++) vertices[i].parallel_safe =
true;
342 omp_set_num_threads(exec_threads);
344 #pragma omp parallel sections
348 #pragma omp parallel for schedule(dynamic)
349 for(
int vid=sub_interval_st; vid <= (int)sub_interval_en; vid++) {
350 svertex_t & v = vertices[vid - sub_interval_st];
352 if (exec_threads == 1 || v.parallel_safe) {
353 v.dataptr = vertex_data_handler->vertex_data_ptr(vid);
355 userprogram.update(v, chicontext);
361 if (exec_threads > 1 && enable_deterministic_parallelism) {
362 int nonsafe_count = 0;
363 for(
int vid=sub_interval_st; vid <= (int)sub_interval_en; vid++) {
364 svertex_t & v = vertices[vid - sub_interval_st];
365 if (!v.parallel_safe && v.scheduled) {
366 v.dataptr = vertex_data_handler->vertex_data_ptr(vid);
367 userprogram.update(v, chicontext);
372 m.
add(
"serialized-updates", nonsafe_count);
376 m.stop_time(me,
"execute-updates");
379 virtual void init_vertices(std::vector<svertex_t> &vertices, graphchi_edge<EdgeDataType> * &edata) {
380 size_t nvertices = vertices.size();
386 edata = (graphchi_edge<EdgeDataType>*) malloc(num_edges *
sizeof(graphchi_edge<EdgeDataType>));
390 for(
int i=0; i < (int)nvertices; i++) {
391 degree d = degree_handler->get_degree(sub_interval_st + i);
392 int inc = d.indegree;
393 int outc = d.outdegree;
394 vertices[i] = svertex_t(sub_interval_st + i, &edata[ecounter],
395 &edata[ecounter + inc * store_inedges], inc, outc);
396 if (scheduler != NULL) {
397 bool is_sched = scheduler->is_scheduled(sub_interval_st + i);
399 vertices[i].scheduled =
true;
401 ecounter += inc * store_inedges + outc;
405 vertices[i].scheduled =
true;
406 ecounter += inc * store_inedges + outc;
413 void save_vertices(std::vector<svertex_t> &vertices) {
414 size_t nvertices = vertices.size();
415 bool modified_any_vertex =
false;
416 for(
int i=0; i < (int)nvertices; i++) {
417 if (vertices[i].modified) {
418 modified_any_vertex =
true;
422 if (modified_any_vertex) {
423 vertex_data_handler->save();
427 virtual void load_after_updates(std::vector<svertex_t> &vertices) {
431 virtual void write_delta_log() {
433 std::string deltafname = iomgr->multiplexprefix(0) + base_filename +
".deltalog";
434 FILE * df = fopen(deltafname.c_str(), (chicontext.iteration == 0 ?
"w" :
"a"));
435 fprintf(df,
"%d,%lu,%lu,%lf\n", chicontext.iteration, nupdates, work, chicontext.get_delta());
441 virtual std::pair<vid_t, vid_t> get_interval(
int i) {
445 vid_t get_interval_start(
int i) {
446 return get_interval(i).first;
449 vid_t get_interval_end(
int i) {
450 return get_interval(i).second;
453 virtual size_t num_vertices() {
454 return 1 + intervals[nshards - 1].second;
457 graphchi_context &get_context() {
461 size_t num_updates() {
472 virtual size_t num_buffered_edges() {
480 if (only_adjacency) {
482 logstream(
LOG_ERROR) <<
"Asked number of edges, but engine was run without edge-data." << std::endl;
494 if (scheduler == NULL)
return true;
495 for(vid_t v=st; v<=en; v++) {
496 if (scheduler->is_scheduled(v)) {
503 virtual void initialize_iter() {
507 virtual void initialize_before_run() {
511 virtual memshard_t * create_memshard(vid_t interval_st, vid_t interval_en) {
512 return new memshard_t(this->iomgr,
513 filename_shard_edata<EdgeDataType>(base_filename, exec_interval, nshards),
514 filename_shard_adj(base_filename, exec_interval, nshards),
526 m.start_time(
"runtime");
527 degree_handler = create_degree_handler();
530 logstream(
LOG_INFO) <<
"GraphChi starting" << std::endl;
531 logstream(
LOG_INFO) <<
"Licensed under the Apache License 2.0" << std::endl;
532 logstream(
LOG_INFO) <<
"Copyright Aapo Kyrola et al., Carnegie Mellon University (2012)" << std::endl;
536 initialize_before_run();
540 initialize_sliding_shards();
541 initialize_scheduler();
550 for(iter=0; iter < niters; iter++) {
551 logstream(
LOG_INFO) <<
"Start iteration: " << iter << std::endl;
556 vertex_data_handler->check_size(num_vertices());
559 chicontext.filename = base_filename;
560 chicontext.iteration = iter;
561 chicontext.num_iterations = niters;
562 chicontext.nvertices = num_vertices();
563 chicontext.scheduler = scheduler;
564 chicontext.execthreads = exec_threads;
565 chicontext.reset_deltas(exec_threads);
571 if (use_selective_scheduling) {
572 if (scheduler != NULL) {
573 if (!scheduler->has_new_tasks) {
574 logstream(
LOG_INFO) <<
"No new tasks to run!" << std::endl;
577 scheduler->has_new_tasks =
false;
582 for(exec_interval=0; exec_interval < nshards; ++exec_interval) {
584 vid_t interval_st = get_interval_start(exec_interval);
585 vid_t interval_en = get_interval_end(exec_interval);
590 sliding_shards[exec_interval]->flush();
591 iomgr->wait_for_writes();
594 if (memoryshard != NULL)
delete memoryshard;
595 memoryshard = create_memshard(interval_st, interval_en);
596 memoryshard->only_adjacency = only_adjacency;
599 sub_interval_st = interval_st;
600 logstream(
LOG_INFO) << chicontext.runtime() <<
"s: Starting: "
601 << sub_interval_st <<
" -- " << interval_en << std::endl;
603 while (sub_interval_st < interval_en) {
608 membudget_mb * 1024 * 1024);
609 assert(sub_interval_en > sub_interval_st);
611 logstream(
LOG_INFO) <<
"Iteration " << iter <<
"/" << (niters - 1) <<
", subinterval: " << sub_interval_st <<
" - " << sub_interval_en << std::endl;
614 if (!any_vertex_scheduled) {
615 logstream(
LOG_INFO) <<
"No vertices scheduled, skip." << std::endl;
616 sub_interval_st = sub_interval_en + 1;
621 int nvertices = sub_interval_en - sub_interval_st + 1;
623 std::vector<svertex_t> vertices(nvertices, svertex_t());
624 init_vertices(vertices, edata);
627 if (scheduler != NULL)
628 scheduler->remove_tasks(sub_interval_st, sub_interval_en);
631 load_before_updates(vertices);
634 logstream(
LOG_INFO) <<
"Start updates" << std::endl;
636 exec_updates(userprogram, vertices);
637 logstream(
LOG_INFO) <<
"Finished updates" << std::endl;
640 load_after_updates(vertices);
643 save_vertices(vertices);
645 sub_interval_st = sub_interval_en + 1;
654 if (memoryshard->loaded()) {
655 logstream(
LOG_INFO) <<
"Commit memshard" << std::endl;
657 memoryshard->
commit(modifies_inedges);
658 sliding_shards[exec_interval]->set_offset(memoryshard->offset_for_stream_cont(), memoryshard->offset_vid_for_stream_cont(),
659 memoryshard->edata_ptr_for_stream_cont());
673 for(
int p=0; p<nshards; p++) {
674 sliding_shards[p]->flush();
675 sliding_shards[p]->set_offset(0, 0, 0);
677 iomgr->wait_for_writes();
683 if (chicontext.last_iteration >= 0) {
684 niters = chicontext.last_iteration + 1;
685 logstream(
LOG_DEBUG) <<
"Last iteration is now: " << (niters-1) << std::endl;
687 iteration_finished();
691 iomgr->commit_preloaded();
693 m.stop_time(
"runtime");
695 m.set(
"updates", nupdates);
697 m.set(
"nvertices", num_vertices());
698 m.set(
"execthreads", (
size_t)exec_threads);
699 m.set(
"loadthreads", (
size_t)load_threads);
700 m.set(
"scheduler", (
size_t)use_selective_scheduling);
705 virtual void iteration_finished() {
709 stripedio * get_iomanager() {
713 virtual void set_modifies_inedges(
bool b) {
714 modifies_inedges = b;
717 virtual void set_modifies_outedges(
bool b) {
718 modifies_outedges = b;
721 virtual void set_only_adjacency(
bool b) {
731 blocksize = blocksize_in_bytes;
744 void set_load_threads(
int lt) {
748 void set_exec_threads(
int et) {
757 enable_deterministic_parallelism = b;
767 sprintf(partstr,
".%d", nshards);
769 std::string intervalsFilename = filename_intervals(base_filename, nshards);
770 std::ifstream intervalsF(intervalsFilename.c_str());
772 if (!intervalsF.good()) {
773 logstream(
LOG_ERROR) <<
"Could not load intervals-file: " << intervalsFilename << std::endl;
775 assert(intervalsF.good());
780 for(
int i=0; i < nshards; i++) {
781 assert(!intervalsF.eof());
783 intervals.push_back(std::pair<vid_t,vid_t>(st, en));
786 for(
int i=0; i < nshards; i++) {
787 logstream(
LOG_INFO) <<
"shard: " << intervals[i].first <<
" - " << intervals[i].second << std::endl;
794 std::map<std::string, std::string> json_params;
802 void set_json(std::string key, std::string value) {
804 json_params[key] = value;
808 template <
typename T>
809 void set_json(std::string key, T val) {
810 std::stringstream ss;
815 std::string get_info_json() {
816 std::stringstream json;
818 json <<
"\"file\" : \"" << base_filename <<
"\",\n";
819 json <<
"\"numOfShards\": " << nshards <<
",\n";
820 json <<
"\"iteration\": " << chicontext.iteration <<
",\n";
821 json <<
"\"numIterations\": " << chicontext.num_iterations <<
",\n";
822 json <<
"\"runTime\": " << chicontext.runtime() <<
",\n";
824 json <<
"\"updates\": " << nupdates <<
",\n";
825 json <<
"\"nvertices\": " << chicontext.nvertices <<
",\n";
826 json <<
"\"interval\":" << exec_interval <<
",\n";
827 json <<
"\"windowStart\":" << sub_interval_st <<
",";
828 json <<
"\"windowEnd\": " << sub_interval_en <<
",";
829 json <<
"\"shards\": [";
831 for(
int p=0; p < (int)nshards; p++) {
832 if (p>0) json <<
",";
835 json <<
"\"p\": " << p <<
", ";
836 json << sliding_shards[p]->get_info_json();