GraphChi  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Macros
slidingshard.hpp
Go to the documentation of this file.
1 
2 
30 #ifndef DEF_GRAPHCHI_SLIDINGSHARD
31 #define DEF_GRAPHCHI_SLIDINGSHARD
32 
33 
34 #include <iostream>
35 #include <cstdio>
36 #include <sstream>
37 #include <vector>
38 #include <fcntl.h>
39 #include <unistd.h>
40 #include <assert.h>
41 #include <string>
42 
43 #include "api/graph_objects.hpp"
44 #include "metrics/metrics.hpp"
45 #include "logger/logger.hpp"
46 #include "io/stripedio.hpp"
47 #include "graphchi_types.hpp"
48 
49 
50 namespace graphchi {
51 
55  struct sblock {
56 
57  int writedesc;
58  int readdesc;
59  size_t offset;
60  size_t end;
61  uint8_t * data;
62  uint8_t * ptr;
63  bool active;
64 
65  sblock() : writedesc(0), readdesc(0), active(false) { data = NULL; }
66  sblock(int wdesc, int rdesc) : writedesc(wdesc), readdesc(rdesc), active(false) { data = NULL; }
67 
68  void commit_async(stripedio * iomgr) {
69  if (active && data != NULL && writedesc >= 0) {
70  iomgr->managed_pwritea_async(writedesc, &data, end-offset, offset, true);
71  }
72  }
73 
74  void commit_now(stripedio * iomgr) {
75  if (active && data != NULL && writedesc >= 0) {
76  size_t len = ptr-data;
77  if (len> end-offset) len = end-offset;
78  iomgr->managed_pwritea_now(writedesc, &data, len, offset);
79  }
80  }
81  void read_async(stripedio * iomgr) {
82  iomgr->managed_preada_async(readdesc, &data, end-offset, offset);
83  }
84  void read_now(stripedio * iomgr) {
85  iomgr->managed_preada_now(readdesc, &data, end-offset, offset);
86  }
87 
88  void release(stripedio * iomgr) {
89  if (data != NULL) iomgr->managed_release(readdesc, &data);
90  data = NULL;
91  }
92  };
93 
94 
95  struct indexentry {
96  size_t adjoffset, edataoffset;
97  indexentry(size_t a, size_t e) : adjoffset(a), edataoffset(e) {}
98  };
99 
100  /*
101  * Graph shard that is streamed. I.e, it can only read in one direction, a chunk
102  * a time.
103  */
104  template <typename VT, typename ET, typename svertex_t = graphchi_vertex<VT, ET>, typename ETspecial = ET>
106 
107  stripedio * iomgr;
108 
109  std::string filename_edata;
110  std::string filename_adj;
111  vid_t range_st, range_end;
112  size_t blocksize;
113 
114  vid_t curvid;
115  size_t adjoffset, edataoffset, adjfilesize, edatafilesize;
116  size_t window_start_edataoffset;
117 
118  std::vector<sblock> activeblocks;
119  int edata_session;
120  int adjfile_session;
121  int writedesc;
122  sblock * curblock;
123  sblock * curadjblock;
124  metrics &m;
125 
126  std::map<int, indexentry> sparse_index; // Sparse index that can be created in the fly
127  bool disable_writes;
128  bool async_edata_loading;
129  bool need_read_outedges; // In this model, we need not to read edgedata but must be careful when commiting it
130 
131 
132  public:
133  bool only_adjacency;
134 
135  sliding_shard(stripedio * iomgr, std::string _filename_edata, std::string _filename_adj, vid_t _range_st, vid_t _range_en, size_t _blocksize, metrics &_m,
136  bool _disable_writes=false, bool onlyadj = false) :
137  iomgr(iomgr),
138  filename_edata(_filename_edata),
139  filename_adj(_filename_adj),
140  range_st(_range_st),
141  range_end(_range_en),
142  blocksize(_blocksize),
143  m(_m),
144  disable_writes(_disable_writes) {
145  curvid = 0;
146  adjoffset = 0;
147  edataoffset = 0;
148  disable_writes = false;
149  only_adjacency = onlyadj;
150  curblock = NULL;
151  curadjblock = NULL;
152  window_start_edataoffset = 0;
153 
154  while(blocksize % sizeof(ET) != 0) blocksize++;
155  assert(blocksize % sizeof(ET)==0);
156 
157  edatafilesize = get_filesize(filename_edata);
158  adjfilesize = get_filesize(filename_adj);
159 
160  if (!only_adjacency)
161  edata_session = iomgr->open_session(filename_edata);
162  else edata_session = -1;
163 
164  adjfile_session = iomgr->open_session(filename_adj, true);
165 
166  save_offset();
167 
168  async_edata_loading = !svertex_t().computational_edges();
169 #ifdef SUPPORT_DELETIONS
170  async_edata_loading = false; // See comment above for memshard, async_edata_loading = false;
171 #endif
172  need_read_outedges = svertex_t().read_outedges();
173  }
174 
175  ~sliding_shard() {
177  if (curblock != NULL) {
178  curblock->release(iomgr);
179  delete curblock;
180  curblock = NULL;
181  }
182  if (curadjblock != NULL) {
183  curadjblock->release(iomgr);
184  delete curadjblock;
185  curadjblock = NULL;
186  }
187 
188  if (edata_session>=0) iomgr->close_session(edata_session);
189  iomgr->close_session(adjfile_session);
190  }
191 
192 
193  size_t num_edges() {
194  return edatafilesize / sizeof(ET);
195  }
196 
197  protected:
198  size_t get_adjoffset() { return adjoffset; }
199  size_t get_edataoffset() { return edataoffset; }
200 
201  void save_offset() {
202  // Note, so that we can use the lower bound operation in map, we need
203  // to insert indices in reverse order
204  sparse_index.insert(std::pair<int, indexentry>(-((int)curvid), indexentry(adjoffset, edataoffset)));
205  }
206 
207  void move_close_to(vid_t v) {
208  if (curvid >= v) return;
209 
210  std::map<int,indexentry>::iterator lowerbd_iter = sparse_index.lower_bound(-((int)v));
211  int closest_vid = -((int)lowerbd_iter->first);
212  assert(closest_vid>=0);
213  indexentry closest_offset = lowerbd_iter->second;
214  assert(closest_vid <= (int)v);
215  if (closest_vid > (int)curvid) { /* Note: this will fail if we have over 2B vertices! */
216  logstream(LOG_DEBUG)
217  << "Sliding shard, start: " << range_st << " moved to: " << closest_vid << " " << closest_offset.adjoffset << ", asked for : " << v << " was in: curvid= " << curvid << " " << adjoffset << std::endl;
218  if (curblock != NULL) // Move the pointer - this may invalidate the curblock, but it is being checked later
219  curblock->ptr += closest_offset.edataoffset - edataoffset;
220  if (curadjblock != NULL)
221  curadjblock->ptr += closest_offset.adjoffset - adjoffset;
222  curvid = (vid_t)closest_vid;
223  adjoffset = closest_offset.adjoffset;
224  edataoffset = closest_offset.edataoffset;
225  return;
226  } else {
227  // Do nothing - just continue from current pos.
228  return;
229  }
230 
231  }
232 
233  /* NOTE: here is a subtle bug because two blocks can overlap */
234  inline void check_curblock(size_t toread) {
235  if (curblock == NULL || curblock->end < edataoffset+toread) {
236  if (curblock != NULL) {
237  if (!curblock->active) {
238  curblock->release(iomgr);
239  }
240  }
241  // Load next
242  sblock newblock(edata_session, edata_session);
243  newblock.offset = edataoffset;
244  newblock.end = std::min(edatafilesize, edataoffset+blocksize);
245  assert(newblock.end >= newblock.offset );
246 
247  iomgr->managed_malloc(edata_session, &newblock.data, newblock.end - newblock.offset, newblock.offset);
248  newblock.ptr = newblock.data;
249  activeblocks.push_back(newblock);
250  curblock = &activeblocks[activeblocks.size()-1];
251  }
252  }
253 
254  inline void check_adjblock(size_t toread) {
255  if (curadjblock == NULL || curadjblock->end <= adjoffset + toread) {
256  if (curadjblock != NULL) {
257  curadjblock->release(iomgr);
258  delete curadjblock;
259  curadjblock = NULL;
260  }
261  sblock * newblock = new sblock(0, adjfile_session);
262  newblock->offset = adjoffset;
263  newblock->end = std::min(adjfilesize, adjoffset+blocksize);
264  assert(newblock->end > 0);
265  assert(newblock->end >= newblock->offset);
266  iomgr->managed_malloc(adjfile_session, &newblock->data, newblock->end - newblock->offset, adjoffset);
267  newblock->ptr = newblock->data;
268  metrics_entry me = m.start_time();
269  iomgr->managed_preada_now(adjfile_session, &newblock->data, newblock->end - newblock->offset, adjoffset);
270  m.stop_time(me, "blockload");
271  curadjblock = newblock;
272  }
273  }
274 
275  template <typename U>
276  inline U read_val() {
277  check_adjblock(sizeof(U));
278  U res = *((U*)curadjblock->ptr);
279  adjoffset += sizeof(U);
280  curadjblock->ptr += sizeof(U);
281  return res;
282  }
283 
284  template <typename U>
285  inline U * read_edgeptr() {
286  if (only_adjacency) return NULL;
287  check_curblock(sizeof(U));
288  U * resptr = ((U*)curblock->ptr);
289  edataoffset += sizeof(U);
290  curblock->ptr += sizeof(U);
291  return resptr;
292  }
293 
294  inline void skip(int n, int sz) {
295  size_t tot = n * sz;
296  adjoffset += tot;
297  if (curadjblock != NULL)
298  curadjblock->ptr += tot;
299  edataoffset += sizeof(ET)*n;
300  if (curblock != NULL)
301  curblock->ptr += sizeof(ET)*n;
302  }
303 
304  public:
308  void read_next_vertices(int nvecs, vid_t start, std::vector<svertex_t> & prealloc, bool record_index=false, bool disable_writes=false) {
309  metrics_entry me = m.start_time();
310  if (!record_index)
311  move_close_to(start);
312 
313  /* Release the blocks we do not need anymore */
314  curblock = NULL;
315  release_prior_to_offset(false, disable_writes);
316  assert(activeblocks.size() <= 1);
317 
318  /* Read next */
319  if (!activeblocks.empty() && !only_adjacency) {
320  curblock = &activeblocks[0];
321  }
322  vid_t lastrec = start;
323  window_start_edataoffset = edataoffset;
324 
325  for(int i=((int)curvid) - ((int)start); i<nvecs; i++) {
326  if (adjoffset >= adjfilesize) break;
327 
328  // TODO: skip unscheduled vertices.
329 
330  int n;
331  if (record_index && (size_t)(curvid - lastrec) >= (size_t) std::max((int)100000, nvecs/16)) {
332  save_offset();
333  lastrec = curvid;
334  }
335  uint8_t ns = read_val<uint8_t>();
336  if (ns == 0x00) {
337  curvid++;
338  uint8_t nz = read_val<uint8_t>();
339  curvid += nz;
340  i += nz;
341  continue;
342  }
343 
344  if (ns == 0xff) {
345  n = read_val<uint32_t>();
346  } else {
347  n = ns;
348  }
349 
350  if (i<0) {
351  // Just skipping
352  skip(n, sizeof(vid_t));
353  } else {
354  svertex_t& vertex = prealloc[i];
355  assert(vertex.id() == curvid);
356 
357  if (vertex.scheduled) {
358 
359  while(--n>=0) {
360  bool special_edge = false;
361  vid_t target = (sizeof(ET) == sizeof(ETspecial) ? read_val<vid_t>() : translate_edge(read_val<vid_t>(), special_edge));
362  ET * evalue = (special_edge ? (ET*)read_edgeptr<ETspecial>(): read_edgeptr<ET>());
363 
364  if (!only_adjacency) {
365  if (!curblock->active) {
366  if (async_edata_loading) {
367  curblock->read_async(iomgr);
368  } else {
369  if (need_read_outedges) curblock->read_now(iomgr);
370  }
371  }
372  // Note: this needs to be set always because curblock might change during this loop.
373  curblock->active = true; // This block has an scheduled vertex - need to commit
374  }
375  vertex.add_outedge(target, evalue, special_edge);
376 
377  if (!((target >= range_st && target <= range_end))) {
378  logstream(LOG_ERROR) << "Error : " << target << " not in [" << range_st << " - " << range_end << "]" << std::endl;
379  iomgr->print_session(adjfile_session);
380  }
381  assert(target >= range_st && target <= range_end);
382  }
383 
384  } else {
385  // This vertex was not scheduled, so we can just skip its edges.
386  skip(n, sizeof(vid_t));
387  }
388  }
389  curvid++;
390  }
391  m.stop_time(me, "read_next_vertices");
392  curblock = NULL;
393  }
394 
395 
399  void commit(sblock &b, bool synchronously, bool disable_writes=false) {
400  if (synchronously) {
401  metrics_entry me = m.start_time();
402  if (!disable_writes) b.commit_now(iomgr);
403  m.stop_time(me, "commit");
404  b.release(iomgr);
405  } else {
406  if (!disable_writes) b.commit_async(iomgr);
407  else b.release(iomgr);
408  }
409  }
410 
414  void flush() {
416  if (curadjblock != NULL) {
417  curadjblock->release(iomgr);
418  delete curadjblock;
419  curadjblock = NULL;
420  }
421  }
422 
426  void set_offset(size_t newoff, vid_t _curvid, size_t edgeptr) {
427  this->adjoffset = newoff;
428  this->curvid = _curvid;
429  this->edataoffset = edgeptr;
430  if (curadjblock != NULL) {
431  curadjblock->release(iomgr);
432  delete curadjblock;
433  curadjblock = NULL;
434  }
435  }
436 
440  void release_prior_to_offset(bool all=false, bool disable_writes=false) { // disable writes is for the dynamic case
441  for(int i=(int)activeblocks.size() - 1; i >= 0; i--) {
442  sblock &b = activeblocks[i];
443  if (b.end <= edataoffset || all) {
444  commit(b, all, disable_writes);
445  activeblocks.erase(activeblocks.begin() + (unsigned int)i);
446  }
447  }
448  }
449 
450  std::string get_info_json() {
451  std::stringstream json;
452  json << "\"size\": ";
453  json << edatafilesize << std::endl;
454  json << ", \"windowStart\": ";
455  json << window_start_edataoffset;
456  json << ", \"windowEnd\": ";
457  json << edataoffset;
458  return json.str();
459  }
460 
461  };
462 
463 
464 
465 
466 };
467 
468 
469 
470 #endif
471