30 #ifndef DEF_GRAPHCHI_SLIDINGSHARD
31 #define DEF_GRAPHCHI_SLIDINGSHARD
47 #include "graphchi_types.hpp"
65 sblock() : writedesc(0), readdesc(0), active(
false) { data = NULL; }
66 sblock(
int wdesc,
int rdesc) : writedesc(wdesc), readdesc(rdesc), active(
false) { data = NULL; }
69 if (active && data != NULL && writedesc >= 0) {
75 if (active && data != NULL && writedesc >= 0) {
76 size_t len = ptr-data;
77 if (len> end-offset) len = end-offset;
78 iomgr->managed_pwritea_now(writedesc, &data, len, offset);
82 iomgr->managed_preada_async(readdesc, &data, end-offset, offset);
85 iomgr->managed_preada_now(readdesc, &data, end-offset, offset);
89 if (data != NULL) iomgr->managed_release(readdesc, &data);
96 size_t adjoffset, edataoffset;
97 indexentry(
size_t a,
size_t e) : adjoffset(a), edataoffset(e) {}
104 template <
typename VT,
typename ET,
typename svertex_t = graphchi_vertex<VT, ET>,
typename ETspecial = ET>
109 std::string filename_edata;
110 std::string filename_adj;
111 vid_t range_st, range_end;
115 size_t adjoffset, edataoffset, adjfilesize, edatafilesize;
116 size_t window_start_edataoffset;
118 std::vector<sblock> activeblocks;
126 std::map<int, indexentry> sparse_index;
128 bool async_edata_loading;
129 bool need_read_outedges;
135 sliding_shard(
stripedio * iomgr, std::string _filename_edata, std::string _filename_adj, vid_t _range_st, vid_t _range_en,
size_t _blocksize,
metrics &_m,
136 bool _disable_writes=
false,
bool onlyadj =
false) :
138 filename_edata(_filename_edata),
139 filename_adj(_filename_adj),
141 range_end(_range_en),
142 blocksize(_blocksize),
144 disable_writes(_disable_writes) {
148 disable_writes =
false;
149 only_adjacency = onlyadj;
152 window_start_edataoffset = 0;
154 while(blocksize %
sizeof(ET) != 0) blocksize++;
155 assert(blocksize %
sizeof(ET)==0);
157 edatafilesize = get_filesize(filename_edata);
158 adjfilesize = get_filesize(filename_adj);
161 edata_session = iomgr->open_session(filename_edata);
162 else edata_session = -1;
164 adjfile_session = iomgr->open_session(filename_adj,
true);
168 async_edata_loading = !svertex_t().computational_edges();
169 #ifdef SUPPORT_DELETIONS
170 async_edata_loading =
false;
172 need_read_outedges = svertex_t().read_outedges();
177 if (curblock != NULL) {
178 curblock->release(iomgr);
182 if (curadjblock != NULL) {
183 curadjblock->release(iomgr);
188 if (edata_session>=0) iomgr->close_session(edata_session);
189 iomgr->close_session(adjfile_session);
194 return edatafilesize /
sizeof(ET);
198 size_t get_adjoffset() {
return adjoffset; }
199 size_t get_edataoffset() {
return edataoffset; }
204 sparse_index.insert(std::pair<int, indexentry>(-((
int)curvid),
indexentry(adjoffset, edataoffset)));
207 void move_close_to(vid_t v) {
208 if (curvid >= v)
return;
210 std::map<int,indexentry>::iterator lowerbd_iter = sparse_index.lower_bound(-((
int)v));
211 int closest_vid = -((int)lowerbd_iter->first);
212 assert(closest_vid>=0);
213 indexentry closest_offset = lowerbd_iter->second;
214 assert(closest_vid <= (
int)v);
215 if (closest_vid > (
int)curvid) {
217 <<
"Sliding shard, start: " << range_st <<
" moved to: " << closest_vid <<
" " << closest_offset.adjoffset <<
", asked for : " << v <<
" was in: curvid= " << curvid <<
" " << adjoffset << std::endl;
218 if (curblock != NULL)
219 curblock->ptr += closest_offset.edataoffset - edataoffset;
220 if (curadjblock != NULL)
221 curadjblock->ptr += closest_offset.adjoffset - adjoffset;
222 curvid = (vid_t)closest_vid;
223 adjoffset = closest_offset.adjoffset;
224 edataoffset = closest_offset.edataoffset;
234 inline void check_curblock(
size_t toread) {
235 if (curblock == NULL || curblock->end < edataoffset+toread) {
236 if (curblock != NULL) {
237 if (!curblock->active) {
238 curblock->release(iomgr);
242 sblock newblock(edata_session, edata_session);
243 newblock.offset = edataoffset;
244 newblock.end = std::min(edatafilesize, edataoffset+blocksize);
245 assert(newblock.end >= newblock.offset );
247 iomgr->managed_malloc(edata_session, &newblock.data, newblock.end - newblock.offset, newblock.offset);
248 newblock.ptr = newblock.data;
249 activeblocks.push_back(newblock);
250 curblock = &activeblocks[activeblocks.size()-1];
254 inline void check_adjblock(
size_t toread) {
255 if (curadjblock == NULL || curadjblock->end <= adjoffset + toread) {
256 if (curadjblock != NULL) {
257 curadjblock->release(iomgr);
262 newblock->offset = adjoffset;
263 newblock->end = std::min(adjfilesize, adjoffset+blocksize);
264 assert(newblock->end > 0);
265 assert(newblock->end >= newblock->offset);
266 iomgr->managed_malloc(adjfile_session, &newblock->data, newblock->end - newblock->offset, adjoffset);
267 newblock->ptr = newblock->data;
269 iomgr->managed_preada_now(adjfile_session, &newblock->data, newblock->end - newblock->offset, adjoffset);
270 m.stop_time(me,
"blockload");
271 curadjblock = newblock;
275 template <
typename U>
276 inline U read_val() {
277 check_adjblock(
sizeof(U));
278 U res = *((U*)curadjblock->ptr);
279 adjoffset +=
sizeof(U);
280 curadjblock->ptr +=
sizeof(U);
284 template <
typename U>
285 inline U * read_edgeptr() {
286 if (only_adjacency)
return NULL;
287 check_curblock(
sizeof(U));
288 U * resptr = ((U*)curblock->ptr);
289 edataoffset +=
sizeof(U);
290 curblock->ptr +=
sizeof(U);
294 inline void skip(
int n,
int sz) {
297 if (curadjblock != NULL)
298 curadjblock->ptr += tot;
299 edataoffset +=
sizeof(ET)*n;
300 if (curblock != NULL)
301 curblock->ptr +=
sizeof(ET)*n;
308 void read_next_vertices(
int nvecs, vid_t start, std::vector<svertex_t> & prealloc,
bool record_index=
false,
bool disable_writes=
false) {
311 move_close_to(start);
316 assert(activeblocks.size() <= 1);
319 if (!activeblocks.empty() && !only_adjacency) {
320 curblock = &activeblocks[0];
322 vid_t lastrec = start;
323 window_start_edataoffset = edataoffset;
325 for(
int i=((
int)curvid) - ((int)start); i<nvecs; i++) {
326 if (adjoffset >= adjfilesize)
break;
331 if (record_index && (
size_t)(curvid - lastrec) >= (
size_t) std::max((
int)100000, nvecs/16)) {
335 uint8_t ns = read_val<uint8_t>();
338 uint8_t nz = read_val<uint8_t>();
345 n = read_val<uint32_t>();
352 skip(n,
sizeof(vid_t));
354 svertex_t& vertex = prealloc[i];
355 assert(vertex.id() == curvid);
357 if (vertex.scheduled) {
360 bool special_edge =
false;
361 vid_t target = (
sizeof(ET) ==
sizeof(ETspecial) ? read_val<vid_t>() : translate_edge(read_val<vid_t>(), special_edge));
362 ET * evalue = (special_edge ? (ET*)read_edgeptr<ETspecial>(): read_edgeptr<ET>());
364 if (!only_adjacency) {
365 if (!curblock->active) {
366 if (async_edata_loading) {
367 curblock->read_async(iomgr);
369 if (need_read_outedges) curblock->read_now(iomgr);
373 curblock->active =
true;
375 vertex.add_outedge(target, evalue, special_edge);
377 if (!((target >= range_st && target <= range_end))) {
378 logstream(
LOG_ERROR) <<
"Error : " << target <<
" not in [" << range_st <<
" - " << range_end <<
"]" << std::endl;
379 iomgr->print_session(adjfile_session);
381 assert(target >= range_st && target <= range_end);
386 skip(n,
sizeof(vid_t));
391 m.stop_time(me,
"read_next_vertices");
399 void commit(
sblock &b,
bool synchronously,
bool disable_writes=
false) {
402 if (!disable_writes) b.commit_now(iomgr);
403 m.stop_time(me,
"commit");
406 if (!disable_writes) b.commit_async(iomgr);
407 else b.release(iomgr);
416 if (curadjblock != NULL) {
417 curadjblock->release(iomgr);
426 void set_offset(
size_t newoff, vid_t _curvid,
size_t edgeptr) {
427 this->adjoffset = newoff;
428 this->curvid = _curvid;
429 this->edataoffset = edgeptr;
430 if (curadjblock != NULL) {
431 curadjblock->release(iomgr);
441 for(
int i=(
int)activeblocks.size() - 1; i >= 0; i--) {
442 sblock &b = activeblocks[i];
443 if (b.end <= edataoffset || all) {
444 commit(b, all, disable_writes);
445 activeblocks.erase(activeblocks.begin() + (
unsigned int)i);
450 std::string get_info_json() {
451 std::stringstream json;
452 json <<
"\"size\": ";
453 json << edatafilesize << std::endl;
454 json <<
", \"windowStart\": ";
455 json << window_start_edataoffset;
456 json <<
", \"windowEnd\": ";