35 #ifndef GRAPHCHI_SHARDER_DEF
36 #define GRAPHCHI_SHARDER_DEF
49 #include "graphchi_types.hpp"
57 #include "util/qsort.hpp"
63 #define SHARDER_BUFSIZE (64 * 1024 * 1024)
65 enum ProcPhase { COMPUTE_INTERVALS=1, SHOVEL=2 };
68 template <
typename EdgeDataType>
74 edge_with_value(vid_t src, vid_t dst, EdgeDataType value) : src(src), dst(dst), value(value) {}
76 bool stopper() {
return src == 0 && dst == 0; }
79 template <
typename EdgeDataType>
81 if (a.src == b.src)
return a.dst < b.dst;
85 template <
typename EdgeDataType>
91 std::string basefilename;
100 size_t bytes_preprocessed;
104 std::vector< std::pair<vid_t, vid_t> > intervals;
123 sharder(std::string basefilename) : basefilename(basefilename), m(
"sharder") {
127 edgedatasize =
sizeof(EdgeDataType);
132 if (prebuf != NULL) free(prebuf);
136 std::string preprocessed_name() {
137 std::stringstream ss;
139 ss <<
"." <<
sizeof(EdgeDataType) <<
"B.bin";
148 int f = open(preprocessed_name().c_str(), O_RDONLY);
161 if (prebuf != NULL) {
162 logstream(
LOG_FATAL) <<
"start_preprocessing() already called! Aborting." << std::endl;
165 m.start_time(
"preprocessing");
166 assert(prebuf == NULL);
167 std::string tmpfilename = preprocessed_name() +
".tmp";
168 binfile_fd = open(tmpfilename.c_str(), O_WRONLY | O_CREAT, S_IROTH | S_IWOTH | S_IWUSR | S_IRUSR);
169 if (binfile_fd < 0) {
170 logstream(
LOG_ERROR) <<
"Could not create file: " << tmpfilename <<
" error: " << strerror(errno) << std::endl;
172 assert(binfile_fd >= 0);
175 prebuf = (
char*) malloc(SHARDER_BUFSIZE);
177 assert(prebuf != NULL);
179 logstream(
LOG_INFO) <<
"Started preprocessing: " << basefilename <<
" --> " << tmpfilename << std::endl;
182 bwrite(binfile_fd, prebuf, prebufptr, (vid_t)0);
184 bytes_preprocessed = 0;
191 assert(prebuf != NULL);
194 writea(binfile_fd, prebuf, prebufptr - prebuf);
195 bytes_preprocessed += prebufptr - prebuf;
196 logstream(
LOG_INFO) <<
"Preprocessed: " << float(bytes_preprocessed / 1024. / 1024.) <<
" MB." << std::endl;
199 assert(max_vertex_id > 0);
200 logstream(
LOG_INFO) <<
"Maximum vertex id: " << max_vertex_id << std::endl;
201 ssize_t written = pwrite(binfile_fd, &max_vertex_id,
sizeof(vid_t), 0);
202 if (written !=
sizeof(vid_t)) {
203 logstream(
LOG_ERROR) <<
"Error when writing preprocessed file: " << strerror(errno) << std::endl;
205 assert(written ==
sizeof(vid_t));
213 std::string tmpfilename = preprocessed_name() +
".tmp";
214 rename(tmpfilename.c_str(), preprocessed_name().c_str());
217 logstream(
LOG_INFO) <<
"Finished preprocessing: " << basefilename <<
" --> " << preprocessed_name() << std::endl;
218 m.stop_time(
"preprocessing");
225 if (prebuf == NULL) {
226 logstream(
LOG_FATAL) <<
"You need to call start_preprocessing() prior to adding any edges!" << std::endl;
228 assert(prebuf != NULL);
230 bwrite(binfile_fd, prebuf, prebufptr,
edge_t(from, to, val));
231 max_vertex_id = std::max(std::max(from, to), max_vertex_id);
235 template <
typename T>
236 void bwrite(
int f,
char * buf,
char * &bufptr, T val) {
237 if (bufptr +
sizeof(T) - buf >= SHARDER_BUFSIZE) {
238 writea(f, buf, bufptr - buf);
242 bytes_preprocessed += SHARDER_BUFSIZE;
243 logstream(
LOG_INFO)<<
"Preprocessed: " << float(bytes_preprocessed / 1024. / 1024.) <<
" MB." << std::endl;
259 m.start_time(
"execute_sharding");
262 size_t blocksize = 32 * 1024 * 1024;
263 while (blocksize %
sizeof(
edge_t)) blocksize++;
265 char * block = (
char*) malloc(blocksize);
266 size_t total_to_process = get_filesize(preprocessed_name());
268 for(
int phase=1; phase <= 2; ++phase) {
270 FILE * inf = fopen(preprocessed_name().c_str(),
"r");
272 logstream(
LOG_FATAL) <<
"Could not open preprocessed file: " << preprocessed_name() <<
273 " error: " << strerror(errno) << std::endl;
278 ssize_t rd = fread(&max_vertex_id,
sizeof(vid_t), 1, inf);
280 logstream(
LOG_ERROR) <<
"Read: " << rd << std::endl;
281 logstream(
LOG_ERROR) <<
"Could not read from preprocessed file. Error: " << strerror(errno) << std::endl;
284 logstream(
LOG_INFO) <<
"Max vertex id: " << max_vertex_id << std::endl;
286 this->start_phase(phase);
291 while(len < blocksize) {
292 int a = (int) fread(block + len, 1, blocksize - len, inf);
298 logstream(
LOG_DEBUG) <<
"Phase: " << phase <<
" read:"
299 << (totread * 1.0 / total_to_process * 100) <<
"%" << std::endl;
303 for(
size_t i=0; i<len; i++) {
304 this->receive_edge(ptr[i].src, ptr[i].dst, ptr[i].value);
306 }
while (!feof(inf));
319 m.stop_time(
"execute_sharding");
335 if (nshards_string.find(
"auto") != std::string::npos || nshards_string ==
"0") {
336 logstream(
LOG_INFO) <<
"Determining number of shards automatically." << std::endl;
338 int membudget_mb = get_option_int(
"membudget_mb", 1024);
339 logstream(
LOG_INFO) <<
"Assuming available memory is " << membudget_mb <<
" megabytes. " << std::endl;
340 logstream(
LOG_INFO) <<
" (This can be defined with configuration parameter 'membudget_mb')" << std::endl;
342 bytes_preprocessed = get_filesize(preprocessed_name()) -
sizeof(vid_t);
343 assert(bytes_preprocessed > 0);
344 size_t numedges = bytes_preprocessed /
sizeof(
edge_t);
346 double max_shardsize = membudget_mb * 1024. * 1024. / 8;
347 logstream(
LOG_INFO) <<
"Determining maximum shard size: " << (max_shardsize / 1024. / 1024.) <<
" MB." << std::endl;
349 nshards = (int) ( 2 + (numedges *
sizeof(EdgeDataType) / max_shardsize) + 0.5);
351 nshards = atoi(nshards_string.c_str());
354 logstream(
LOG_INFO) <<
"Number of shards to be created: " << nshards << std::endl;
358 void compute_partitionintervals() {
359 size_t edges_per_part = nedges / nshards + 1;
361 logstream(
LOG_INFO) <<
"Number of shards: " << nshards << std::endl;
362 logstream(
LOG_INFO) <<
"Edges per shard: " << edges_per_part << std::endl;
363 logstream(
LOG_INFO) <<
"Max vertex id: " << max_vertex_id << std::endl;
366 size_t edgecounter=0;
367 std::string fname = filename_intervals(basefilename, nshards);
368 FILE * f = fopen(fname.c_str(),
"w");
371 logstream(
LOG_ERROR) <<
"Could not open file: " << fname <<
" error: " <<
372 strerror(errno) << std::endl;
377 while(nshards > (
int) intervals.size()) {
379 edgecounter += edgecounts[i / vertexchunk];
380 if (edgecounter >= edges_per_part || (i >= max_vertex_id)) {
381 intervals.push_back(std::pair<vid_t,vid_t>(cur_st, i + (i >= max_vertex_id)));
382 logstream(
LOG_INFO) <<
"Interval: " << cur_st <<
" - " << i << std::endl;
383 fprintf(f,
"%u\n", i + (i == max_vertex_id));
389 assert(nshards == (
int)intervals.size());
391 logstream(
LOG_INFO) <<
"Computed intervals." << std::endl;
394 std::string shovel_filename(
int shard) {
395 std::stringstream ss;
396 ss << basefilename << shard <<
"." << nshards <<
".shovel";
400 void start_phase(
int p) {
403 logstream(
LOG_INFO) <<
"Starting phase: " << phase << std::endl;
405 case COMPUTE_INTERVALS:
410 vertexchunk = (int) (max_vertex_id *
sizeof(
int) / (1024 * 1024 * get_option_long(
"membudget_mb", 1024)));
411 if (vertexchunk<1) vertexchunk = 1;
412 edgecounts = (
int*)calloc( max_vertex_id / vertexchunk + 1,
sizeof(
int));
417 shovelfs =
new int[nshards];
418 bufs =
new edge_t*[nshards];
419 bufptrs =
new int[nshards];
420 bufsize = (1024 * 1024 * get_option_long(
"membudget_mb", 1024)) / nshards / 4;
421 while(bufsize %
sizeof(edge_t) != 0) bufsize++;
423 logstream(
LOG_DEBUG)<<
"Shoveling bufsize: " << bufsize << std::endl;
425 for(
int i=0; i < nshards; i++) {
426 std::string fname = shovel_filename(i);
427 shovelfs[i] = open(fname.c_str(), O_WRONLY | O_CREAT, S_IROTH | S_IWOTH | S_IWUSR | S_IRUSR);
428 if (shovelfs[i] < 0) {
429 logstream(
LOG_ERROR) <<
"Could not create a temporary file " << fname <<
430 " error: " << strerror(errno) << std::endl;
432 assert(shovelfs[i] >= 0);
433 bufs[i] = (edge_t*) malloc(bufsize);
441 logstream(
LOG_INFO) <<
"Ending phase: " << phase << std::endl;
443 case COMPUTE_INTERVALS:
444 compute_partitionintervals();
449 for(
int i=0; i<nshards; i++) {
450 writea(shovelfs[i], bufs[i],
sizeof(edge_t) * (bufptrs[i]));
463 void swrite(
int shard, edge_t et) {
464 bufs[shard][bufptrs[shard]++] = et;
465 if (bufptrs[shard] *
sizeof(edge_t) >= bufsize) {
466 writea(shovelfs[shard], bufs[shard],
sizeof(edge_t) * bufptrs[shard]);
472 void receive_edge(vid_t from, vid_t to, EdgeDataType value) {
474 logstream(
LOG_WARNING) <<
"Tried to add self-edge " << from <<
"->" << to << std::endl;
477 if (from > max_vertex_id || to > max_vertex_id) {
478 logstream(
LOG_ERROR) <<
"Tried to add an edge with too large from/to values. From:" <<
479 from <<
" to: "<< to <<
" max: " << max_vertex_id << std::endl;
483 case COMPUTE_INTERVALS:
484 edgecounts[to / vertexchunk]++;
489 for(
int i=0; i < nshards; i++) {
490 int shard = (lastpart + i) % nshards;
491 if (to >= intervals[shard].first && to <= intervals[shard].second) {
492 swrite(shard, edge_t(from, to, value));
499 logstream(
LOG_ERROR) <<
"Shard not found for : " << to << std::endl;
514 for(
int shard=0; shard < nshards; shard++) {
515 logstream(
LOG_INFO) <<
"Starting final processing for shard: " << shard << std::endl;
517 std::string shovelfname = shovel_filename(shard);
518 std::string fname = filename_shard_adj(basefilename, shard, nshards);
519 std::string edfname = filename_shard_edata<EdgeDataType>(basefilename, shard, nshards);
522 int shovelf = open(shovelfname.c_str(), O_RDONLY);
523 size_t shovelsize = readfull(shovelf, (
char**) &shovelbuf);
524 size_t numedges = shovelsize /
sizeof(
edge_t);
526 logstream(
LOG_DEBUG) <<
"Shovel size:" << shovelsize <<
" edges: " << numedges << std::endl;
528 quickSort(shovelbuf, (
int)numedges, edge_t_src_less<EdgeDataType>);
531 int f = open(fname.c_str(), O_WRONLY | O_CREAT, S_IROTH | S_IWOTH | S_IWUSR | S_IRUSR);
533 logstream(
LOG_ERROR) <<
"Could not open " << fname <<
" error: " << strerror(errno) << std::endl;
536 int trerr = ftruncate(f, 0);
540 int ef = open(edfname.c_str(), O_WRONLY | O_CREAT, S_IROTH | S_IWOTH | S_IWUSR | S_IRUSR);
542 logstream(
LOG_ERROR) <<
"Could not open " << edfname <<
" error: " << strerror(errno) << std::endl;
547 char * buf = (
char*) malloc(SHARDER_BUFSIZE);
549 char * ebuf = (
char*) malloc(SHARDER_BUFSIZE);
550 char * ebufptr = ebuf;
554 for(
size_t i=0; i <= numedges; i++) {
556 bwrite<EdgeDataType>(ef, ebuf, ebufptr, EdgeDataType(edge.value));
558 if ((edge.src != curvid)) {
560 size_t count = i - istart;
561 assert(count>0 || curvid==0);
564 uint8_t x = (uint8_t)count;
565 bwrite<uint8_t>(f, buf, bufptr, x);
567 bwrite<uint8_t>(f, buf, bufptr, 0xff);
568 bwrite<uint32_t>(f, buf, bufptr,(uint32_t)count);
572 for(
size_t j=istart; j<i; j++) {
573 bwrite(f, buf, bufptr, shovelbuf[j].dst);
579 if (!edge.stopper()) {
580 if (edge.src - curvid > 1 || (i == 0 && edge.src>0)) {
581 int nz = edge.src-curvid-1;
582 if (i == 0 && edge.src>0) nz = edge.src;
584 bwrite<uint8_t>(f, buf, bufptr, 0);
586 int tnz = std::min(254, nz);
587 bwrite<uint8_t>(f, buf, bufptr, (uint8_t) tnz);
597 writea(f, buf, bufptr - buf);
603 writea(ef, ebuf, ebufptr - ebuf);
607 remove(shovelfname.c_str());
611 create_degree_file();
615 typedef char dummy_t;
621 void create_degree_file() {
624 std::vector<slidingshard_t * > sliding_shards;
626 int subwindow = 5000000;
627 std::cout <<
"Subwindow : " << subwindow << std::endl;
628 m.set(
"subwindow", (
size_t)subwindow);
632 m.start_time(
"degrees.runtime");
635 int blocksize = get_option_int(
"blocksize", 1024 * 1024);
637 for(
int p=0; p < nshards; p++) {
638 std::cout <<
"Initialize streaming shard: " << p << std::endl;
639 sliding_shards.push_back(
640 new slidingshard_t(iomgr, filename_shard_edata<EdgeDataType>(basefilename, p, nshards),
641 filename_shard_adj(basefilename, p, nshards), intervals[p].first,
643 blocksize, m,
true));
645 for(
int p=0; p < nshards; p++) sliding_shards[p]->only_adjacency =
true;
647 graphchi_context ginfo;
648 ginfo.nvertices = 1 + intervals[nshards - 1].second;
649 ginfo.scheduler = NULL;
652 std::string outputfname = filename_degree_data(basefilename);
654 int degreeOutF = open(outputfname.c_str(), O_RDWR | O_CREAT, S_IROTH | S_IWOTH | S_IWUSR | S_IRUSR);
655 if (degreeOutF < 0) {
656 logstream(
LOG_ERROR) <<
"Could not create: " << degreeOutF << std::endl;
658 assert(degreeOutF >= 0);
659 int trerr = ftruncate(degreeOutF, ginfo.nvertices *
sizeof(
int) * 2);
662 for(
int window=0; window<nshards; window++) {
663 metrics_entry mwi = m.start_time();
665 vid_t interval_st = intervals[window].first;
666 vid_t interval_en = intervals[window].second;
669 sliding_shards[window]->flush();
672 memshard_t memshard(iomgr, filename_shard_edata<EdgeDataType>(basefilename, window, nshards), filename_shard_adj(basefilename, window, nshards),
673 interval_st, interval_en, m);
674 memshard.only_adjacency =
true;
675 logstream(
LOG_INFO) <<
"Interval: " << interval_st <<
" " << interval_en << std::endl;
677 for(vid_t subinterval_st=interval_st; subinterval_st < interval_en; ) {
678 vid_t subinterval_en = std::min(interval_en, subinterval_st + subwindow);
679 logstream(
LOG_INFO) <<
"(Degree proc.) Sub-window: [" << subinterval_st <<
" - " << subinterval_en <<
"]" << std::endl;
680 assert(subinterval_en >= subinterval_st && subinterval_en <= interval_en);
683 metrics_entry men = m.start_time();
684 int nvertices = subinterval_en-subinterval_st+1;
685 std::vector< graphchi_vertex<int, dummy_t> > vertices(nvertices, graphchi_vertex<int, dummy_t>());
688 for(
int i=0; i < nvertices; i++) {
689 vertices[i] = graphchi_vertex<int, dummy_t>(subinterval_st + i, NULL, NULL, 0, 0);
690 vertices[i].scheduled =
true;
693 metrics_entry me = m.start_time();
694 omp_set_num_threads(loadthreads);
695 #pragma omp parallel for
696 for(
int p=-1; p < nshards; p++) {
699 if (memshard.loaded() ==
false) {
704 memshard.load_vertices(subinterval_st, subinterval_en, vertices);
708 sliding_shards[p]->read_next_vertices(nvertices, subinterval_st, vertices,
false);
713 m.stop_time(me,
"stream_ahead", window,
true);
716 metrics_entry mev = m.start_time();
719 int * vbuf = (
int*) malloc(nvertices*
sizeof(
int)*2);
721 for(
int i=0; i<nvertices; i++) {
722 vbuf[2 * i] = vertices[i].num_inedges();
723 vbuf[2 * i +1] = vertices[i].num_outedges();
725 pwritea(degreeOutF, vbuf, nvertices *
sizeof(
int) * 2, subinterval_st *
sizeof(
int) * 2);
730 subinterval_st = subinterval_en+1;
733 sliding_shards[window]->set_offset(memshard.offset_for_stream_cont(), memshard.offset_vid_for_stream_cont(),
734 memshard.edata_ptr_for_stream_cont());
737 m.stop_time(
"degrees.runtime");