GraphChi  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Macros
functional_engine.hpp
Go to the documentation of this file.
1 
32 #ifndef GRAPHCHI_FUNCTIONALENGINE_DEF
33 #define GRAPHCHI_FUNCTIONALENGINE_DEF
34 
36 #include "logger/logger.hpp"
37 
38 namespace graphchi {
39 
40  template <typename VertexDataType, typename EdgeDataType, typename fvertex_t>
41  class functional_engine : public graphchi_engine<VertexDataType, EdgeDataType, fvertex_t> {
42  public:
43  functional_engine(std::string base_filename, int nshards, bool selective_scheduling, metrics &_m) :
44  graphchi_engine<VertexDataType, EdgeDataType, fvertex_t>(base_filename, nshards, selective_scheduling, _m){
45  _m.set("engine", "functional");
46  }
47 
48  protected:
49  /* Override - load only memory shard (i.e inedges) */
50  virtual void load_before_updates(std::vector<fvertex_t> &vertices) {
51  logstream(LOG_DEBUG) << "Processing in-edges." << std::endl;
52  /* Load memory shard */
53  if (!this->memoryshard->loaded()) {
54  this->memoryshard->load();
55  }
56 
57  /* Load vertex edges from memory shard */
58  this->memoryshard->load_vertices(this->sub_interval_st, this->sub_interval_en, vertices, true, false);
59 
60  /* Load vertices */
61  this->vertex_data_handler->load(this->sub_interval_st, this->sub_interval_en);
62 
63  /* Wait for all reads to complete */
64  this->iomgr->wait_for_reads();
65  }
66 
67 
68  /* Override - do not allocate edge data */
69  virtual void init_vertices(std::vector<fvertex_t> &vertices,
70  graphchi_edge<EdgeDataType> * &edata) {
71  int nvertices = vertices.size();
72  /* Assign vertex edge array pointers */
73  int ecounter = 0;
74  for(int i=0; i < nvertices; i++) {
75  degree d = this->degree_handler->get_degree(this->sub_interval_st + i);
76  int inc = d.indegree;
77  int outc = d.outdegree;
78  vertices[i] = fvertex_t(this->chicontext, this->sub_interval_st + i, inc, outc);
79  if (this->scheduler != NULL) {
80  bool is_sched = this->scheduler->is_scheduled(this->sub_interval_st + i);
81  if (is_sched) {
82  vertices[i].scheduled = true;
83  this->nupdates++;
84  ecounter += inc + outc;
85  }
86  } else {
87  this->nupdates++;
88  vertices[i].scheduled = true;
89  ecounter += inc + outc;
90  }
91  }
92  this->work += ecounter;
93  }
94 
95 
96  /* Override - now load sliding shards, to write (broadcast) to out vertices */
97  virtual void load_after_updates(std::vector<fvertex_t> &vertices) {
98  logstream(LOG_DEBUG) << "Processing out-edges (broadcast)." << std::endl;
99  for(int p=0; p < this->nshards; p++) {
100  /* Stream forward other than the window partition */
101  if (p != this->exec_interval) {
102  this->sliding_shards[p]->read_next_vertices(vertices.size(), this->sub_interval_st, vertices,
103  this->scheduler != NULL && this->iter == 0);
104 
105  } else {
106  this->memoryshard->load_vertices(this->sub_interval_st, this->sub_interval_en, vertices, false, true); // Inedges=false, outedges=true
107  }
108 
109  }
110  }
111 
112  }; // End class
113 
114 }; // End namespace
115 
116 
117 #endif
118 
119