GraphChi  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Macros
memoryshard.hpp
Go to the documentation of this file.
1 
29 #ifndef DEF_GRAPHCHI_MEMSHARD
30 #define DEF_GRAPHCHI_MEMSHARD
31 
32 
33 #include <iostream>
34 #include <cstdio>
35 #include <sstream>
36 #include <vector>
37 #include <fcntl.h>
38 #include <unistd.h>
39 #include <assert.h>
40 #include <string>
41 
42 #include "api/graph_objects.hpp"
43 #include "metrics/metrics.hpp"
44 #include "io/stripedio.hpp"
45 #include "graphchi_types.hpp"
46 
47 
48 namespace graphchi {
49 
50 
51  template <typename VT, typename ET, typename svertex_t = graphchi_vertex<VT, ET>, typename ETspecial = ET>
52  class memory_shard {
53 
54  stripedio * iomgr;
55 
56  std::string filename_edata;
57  std::string filename_adj;
58 
59  vid_t range_st;
60  vid_t range_end;
61  size_t adjfilesize;
62  size_t edatafilesize;
63 
64  size_t edgeptr;
65  vid_t streaming_offset_vid;
66  size_t streaming_offset; // The offset where streaming should continue
67  size_t range_start_offset; // First byte for this range's vertices (used for writing only outedges)
68  size_t range_start_edge_ptr;
69  size_t streaming_offset_edge_ptr;
70  uint8_t * adjdata;
71  ET * edgedata;
72  metrics &m;
73  uint64_t chunkid;
74 
75  int edata_iosession;
76  int adj_session;
77  streaming_task adj_stream_session;
78 
79  bool is_loaded;
80 
81  public:
82  bool only_adjacency;
83 
84  memory_shard(stripedio * iomgr,
85  std::string _filename_edata,
86  std::string _filename_adj,
87  vid_t _range_start,
88  vid_t _range_end,
89  metrics &_m) : iomgr(iomgr), filename_edata(_filename_edata),
90  filename_adj(_filename_adj),
91  range_st(_range_start), range_end(_range_end), m(_m) {
92  edgedata = NULL;
93  adjdata = NULL;
94  only_adjacency = false;
95  is_loaded = false;
96  adj_session = -1;
97  edata_iosession = -1;
98  }
99 
100  ~memory_shard() {
101 
102  if (edata_iosession >= 0) {
103  if (edgedata != NULL) iomgr->managed_release(edata_iosession, &edgedata);
104  iomgr->close_session(edata_iosession);
105  }
106  if (adj_session >= 0) {
107  if (adjdata != NULL) iomgr->managed_release(adj_session, &adjdata);
108  iomgr->close_session(adj_session);
109  }
110  }
111 
112  void commit(bool all) {
113  if (edgedata == NULL || only_adjacency) return;
114  assert(is_loaded);
115  metrics_entry cm = m.start_time();
116 
123  if (all) {
124  iomgr->managed_pwritea_now(edata_iosession, &edgedata, edatafilesize, 0);
125  } else {
126  size_t last = streaming_offset_edge_ptr;
127  if (last == 0){
128  // rollback
129  last = edatafilesize;
130  }
131  char * bufp = ((char*)edgedata + range_start_edge_ptr);
132  iomgr->managed_pwritea_now(edata_iosession, &bufp, last - range_start_edge_ptr, range_start_edge_ptr);
133 
134  }
135  m.stop_time(cm, "memshard_commit");
136 
137  iomgr->managed_release(adj_session, &adjdata);
138  iomgr->managed_release(edata_iosession, &edgedata);
139  is_loaded = false;
140  }
141 
142  bool loaded() {
143  return is_loaded;
144  }
145 
146  // TODO: recycle ptr!
147  void load() {
148  is_loaded = true;
149  adjfilesize = get_filesize(filename_adj);
150  edatafilesize = get_filesize(filename_edata);
151 
152  bool async_inedgedata_loading = !svertex_t().computational_edges();
153 
154 #ifdef SUPPORT_DELETIONS
155  async_inedgedata_loading = false; // Currently we encode the deleted status of an edge into the edge value (should be changed!),
156  // so we need the edge data while loading
157 #endif
158 
159  //preada(adjf, adjdata, adjfilesize, 0);
160 
161  adj_session = iomgr->open_session(filename_adj, true);
162  iomgr->managed_malloc(adj_session, &adjdata, adjfilesize, 0);
163  adj_stream_session = streaming_task(iomgr, adj_session, adjfilesize, (char**) &adjdata);
164 
165  iomgr->launch_stream_reader(&adj_stream_session);
166  /* Initialize edge data asynchonous reading */
167  if (!only_adjacency) {
168  edata_iosession = iomgr->open_session(filename_edata, false);
169 
170  iomgr->managed_malloc(edata_iosession, &edgedata, edatafilesize, 0);
171  if (async_inedgedata_loading) {
172  iomgr->managed_preada_async(edata_iosession, &edgedata, edatafilesize, 0);
173  } else {
174  iomgr->managed_preada_now(edata_iosession, &edgedata, edatafilesize, 0);
175  }
176  }
177  }
178 
179  inline void check_stream_progress(int toread, size_t pos) {
180  if (adj_stream_session.curpos == adjfilesize) return;
181 
182  while(adj_stream_session.curpos < toread+pos) {
183  usleep(20000);
184  if (adj_stream_session.curpos == adjfilesize) return;
185  }
186  }
187 
188  void load_vertices(vid_t window_st, vid_t window_en, std::vector<svertex_t> & prealloc, bool inedges=true, bool outedges=true) {
189  /* Find file size */
190 
191  m.start_time("memoryshard_create_edges");
192 
193  assert(adjdata != NULL);
194 
195  // Now start creating vertices
196  uint8_t * ptr = adjdata;
197  uint8_t * end = ptr + adjfilesize;
198  vid_t vid = 0;
199  edgeptr = 0;
200 
201  streaming_offset = 0;
202  streaming_offset_vid = 0;
203  streaming_offset_edge_ptr = 0;
204  range_start_offset = adjfilesize;
205  range_start_edge_ptr = edatafilesize;
206 
207  bool setoffset = false;
208  bool setrangeoffset = false;
209  while (ptr < end) {
210  check_stream_progress(6, ptr-adjdata); // read at least 6 bytes
211  if (!setoffset && vid > range_end) {
212  // This is where streaming should continue. Notice that because of the
213  // non-zero counters, this might be a bit off.
214  streaming_offset = ptr-adjdata;
215  streaming_offset_vid = vid;
216  streaming_offset_edge_ptr = edgeptr;
217  setoffset = true;
218  }
219  if (!setrangeoffset && vid>=range_st) {
220  range_start_offset = ptr-adjdata;
221  range_start_edge_ptr = edgeptr;
222  setrangeoffset = true;
223  }
224 
225  uint8_t ns = *ptr;
226  int n;
227 
228  ptr += sizeof(uint8_t);
229 
230  if (ns == 0x00) {
231  // next value tells the number of vertices with zeros
232  uint8_t nz = *ptr;
233  ptr += sizeof(uint8_t);
234  vid++;
235  vid += nz;
236  continue;
237  }
238 
239  if (ns == 0xff) { // If 255 is not enough, then stores a 32-bit integer after.
240  n = *((uint32_t*)ptr);
241  ptr += sizeof(uint32_t);
242  } else {
243  n = ns;
244  }
245  svertex_t* vertex = NULL;
246 
247  if (vid>=window_st && vid <=window_en) { // TODO: Make more efficient
248  vertex = &prealloc[vid-window_st];
249  if (!vertex->scheduled) vertex = NULL;
250  }
251  check_stream_progress(n*4, ptr-adjdata);
252  while(--n>=0) {
253  bool special_edge = false;
254  vid_t target = (sizeof(ET)==sizeof(ETspecial) ? *((vid_t*) ptr) : translate_edge(*((vid_t*) ptr), special_edge));
255  ptr += sizeof(vid_t);
256 
257 
258  if (vertex != NULL && outedges)
259  {
260  vertex->add_outedge(target, (only_adjacency ? NULL : (ET*) &((char*)edgedata)[edgeptr]), special_edge);
261  }
262 
263  if (target >= window_st) {
264  if (target <= window_en) {
265  /* In edge */
266  if (inedges) {
267  svertex_t & dstvertex = prealloc[target-window_st];
268  if (dstvertex.scheduled) {
269  assert(only_adjacency || edgeptr < edatafilesize);
270  dstvertex.add_inedge(vid, (only_adjacency ? NULL : (ET*) &((char*)edgedata)[edgeptr]), special_edge);
271  if (vertex != NULL) {
272  dstvertex.parallel_safe = false;
273  vertex->parallel_safe = false; // This edge is shared with another vertex in the same window - not safe to run in parallel.
274  }
275  }
276  }
277  } else if (sizeof(ET) == sizeof(ETspecial)) { // Note, we cannot skip if there can be "special edges". FIXME so dirty.
278  // This vertex has no edges any more for this window, bail out
279  if (vertex == NULL) {
280  ptr += sizeof(vid_t)*n;
281  edgeptr += (n+1)*sizeof(ET);
282  break;
283  }
284  }
285  }
286  edgeptr += sizeof(ET) * !special_edge + sizeof(ETspecial) * special_edge;
287  }
288  vid++;
289  }
290  m.stop_time("memoryshard_create_edges", false);
291  }
292 
293  size_t offset_for_stream_cont() {
294  return streaming_offset;
295  }
296  vid_t offset_vid_for_stream_cont() {
297  return streaming_offset_vid;
298  }
299  size_t edata_ptr_for_stream_cont() {
300  return streaming_offset_edge_ptr;
301  }
302 
303 
304 
305 
306  };
307 };
308 
309 #endif
310