29 #ifndef DEF_GRAPHCHI_MEMSHARD
30 #define DEF_GRAPHCHI_MEMSHARD
45 #include "graphchi_types.hpp"
51 template <
typename VT,
typename ET,
typename svertex_t = graphchi_vertex<VT, ET>,
typename ETspecial = ET>
56 std::string filename_edata;
57 std::string filename_adj;
65 vid_t streaming_offset_vid;
66 size_t streaming_offset;
67 size_t range_start_offset;
68 size_t range_start_edge_ptr;
69 size_t streaming_offset_edge_ptr;
85 std::string _filename_edata,
86 std::string _filename_adj,
89 metrics &_m) : iomgr(iomgr), filename_edata(_filename_edata),
90 filename_adj(_filename_adj),
91 range_st(_range_start), range_end(_range_end), m(_m) {
94 only_adjacency =
false;
102 if (edata_iosession >= 0) {
103 if (edgedata != NULL) iomgr->managed_release(edata_iosession, &edgedata);
104 iomgr->close_session(edata_iosession);
106 if (adj_session >= 0) {
107 if (adjdata != NULL) iomgr->managed_release(adj_session, &adjdata);
108 iomgr->close_session(adj_session);
113 if (edgedata == NULL || only_adjacency)
return;
124 iomgr->managed_pwritea_now(edata_iosession, &edgedata, edatafilesize, 0);
126 size_t last = streaming_offset_edge_ptr;
129 last = edatafilesize;
131 char * bufp = ((
char*)edgedata + range_start_edge_ptr);
132 iomgr->managed_pwritea_now(edata_iosession, &bufp, last - range_start_edge_ptr, range_start_edge_ptr);
135 m.stop_time(cm,
"memshard_commit");
137 iomgr->managed_release(adj_session, &adjdata);
138 iomgr->managed_release(edata_iosession, &edgedata);
149 adjfilesize = get_filesize(filename_adj);
150 edatafilesize = get_filesize(filename_edata);
152 bool async_inedgedata_loading = !svertex_t().computational_edges();
154 #ifdef SUPPORT_DELETIONS
155 async_inedgedata_loading =
false;
161 adj_session = iomgr->open_session(filename_adj,
true);
162 iomgr->managed_malloc(adj_session, &adjdata, adjfilesize, 0);
163 adj_stream_session = streaming_task(iomgr, adj_session, adjfilesize, (
char**) &adjdata);
165 iomgr->launch_stream_reader(&adj_stream_session);
167 if (!only_adjacency) {
168 edata_iosession = iomgr->open_session(filename_edata,
false);
170 iomgr->managed_malloc(edata_iosession, &edgedata, edatafilesize, 0);
171 if (async_inedgedata_loading) {
172 iomgr->managed_preada_async(edata_iosession, &edgedata, edatafilesize, 0);
174 iomgr->managed_preada_now(edata_iosession, &edgedata, edatafilesize, 0);
179 inline void check_stream_progress(
int toread,
size_t pos) {
180 if (adj_stream_session.curpos == adjfilesize)
return;
182 while(adj_stream_session.curpos < toread+pos) {
184 if (adj_stream_session.curpos == adjfilesize)
return;
188 void load_vertices(vid_t window_st, vid_t window_en, std::vector<svertex_t> & prealloc,
bool inedges=
true,
bool outedges=
true) {
191 m.start_time(
"memoryshard_create_edges");
193 assert(adjdata != NULL);
196 uint8_t * ptr = adjdata;
197 uint8_t * end = ptr + adjfilesize;
201 streaming_offset = 0;
202 streaming_offset_vid = 0;
203 streaming_offset_edge_ptr = 0;
204 range_start_offset = adjfilesize;
205 range_start_edge_ptr = edatafilesize;
207 bool setoffset =
false;
208 bool setrangeoffset =
false;
210 check_stream_progress(6, ptr-adjdata);
211 if (!setoffset && vid > range_end) {
214 streaming_offset = ptr-adjdata;
215 streaming_offset_vid = vid;
216 streaming_offset_edge_ptr = edgeptr;
219 if (!setrangeoffset && vid>=range_st) {
220 range_start_offset = ptr-adjdata;
221 range_start_edge_ptr = edgeptr;
222 setrangeoffset =
true;
228 ptr +=
sizeof(uint8_t);
233 ptr +=
sizeof(uint8_t);
240 n = *((uint32_t*)ptr);
241 ptr +=
sizeof(uint32_t);
245 svertex_t* vertex = NULL;
247 if (vid>=window_st && vid <=window_en) {
248 vertex = &prealloc[vid-window_st];
249 if (!vertex->scheduled) vertex = NULL;
251 check_stream_progress(n*4, ptr-adjdata);
253 bool special_edge =
false;
254 vid_t target = (
sizeof(ET)==
sizeof(ETspecial) ? *((vid_t*) ptr) : translate_edge(*((vid_t*) ptr), special_edge));
255 ptr +=
sizeof(vid_t);
258 if (vertex != NULL && outedges)
260 vertex->add_outedge(target, (only_adjacency ? NULL : (ET*) &((
char*)edgedata)[edgeptr]), special_edge);
263 if (target >= window_st) {
264 if (target <= window_en) {
267 svertex_t & dstvertex = prealloc[target-window_st];
268 if (dstvertex.scheduled) {
269 assert(only_adjacency || edgeptr < edatafilesize);
270 dstvertex.add_inedge(vid, (only_adjacency ? NULL : (ET*) &((
char*)edgedata)[edgeptr]), special_edge);
271 if (vertex != NULL) {
272 dstvertex.parallel_safe =
false;
273 vertex->parallel_safe =
false;
277 }
else if (
sizeof(ET) ==
sizeof(ETspecial)) {
279 if (vertex == NULL) {
280 ptr +=
sizeof(vid_t)*n;
281 edgeptr += (n+1)*
sizeof(ET);
286 edgeptr +=
sizeof(ET) * !special_edge +
sizeof(ETspecial) * special_edge;
290 m.stop_time(
"memoryshard_create_edges",
false);
293 size_t offset_for_stream_cont() {
294 return streaming_offset;
296 vid_t offset_vid_for_stream_cont() {
297 return streaming_offset_vid;
299 size_t edata_ptr_for_stream_cont() {
300 return streaming_offset_edge_ptr;