GraphChi  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Macros
graphchi_graphlabv2_1.hpp
Go to the documentation of this file.
1 
29 #ifndef DEF_GRAPHLAB_WRAPPERS
30 #define DEF_GRAPHLAB_WRAPPERS
31 
33 
34 using namespace graphchi;
35 
36 
37 namespace graphlab {
38 
39  struct IS_POD_TYPE { };
40  struct empty {};
41 
42  enum edge_dir_type {
48  NO_EDGES = 0,
53  IN_EDGES = 1,
58  OUT_EDGES = 2 ,
65  ALL_EDGES = 3
66  };
67 
68 
69  template<typename GraphType,
70  typename GatherType,
71  typename MessageType>
72  class icontext {
73  public:
74  // Type members ===========================================================
75 
79  typedef GraphType graph_type;
80 
86 
91  typedef typename graph_type::vertex_id_type vertex_id_type;
92 
97  typedef MessageType message_type;
98 
103  typedef GatherType gather_type;
104 
105  /* GraphChi */
106  graphchi_context * gcontext;
107 
108 
109 
110  public:
111 
112  icontext(graphchi_context * gcontext) : gcontext(gcontext) {}
113 
115  virtual ~icontext() { }
116 
122  virtual size_t num_vertices() const { return gcontext->nvertices; }
123 
131  virtual size_t num_edges() const { assert(false); return 0; } // Not implemented yet
132 
143  virtual size_t procid() const { return (size_t) omp_get_thread_num(); }
144 
159  virtual std::ostream& cout() const { return std::cout; }
160 
175  virtual std::ostream& cerr() const { return std::cerr; }
176 
188  virtual size_t num_procs() const { return gcontext->execthreads; }
189 
195  virtual float elapsed_seconds() const { return gcontext->runtime(); }
196 
203  virtual int iteration() const { return gcontext->iteration; }
204 
215  virtual void stop() {
216  gcontext->last_iteration = gcontext->iteration;
217  }
218 
237  virtual void signal(const vertex_type& vertex,
238  const message_type& message = message_type()) {
239  gcontext->scheduler->add_task(vertex.id());
240  }
241 
253  virtual void signal_vid(vertex_id_type gvid,
254  const message_type& message = message_type()) {
255  gcontext->scheduler->add_task(gvid);
256  }
257 
282  virtual void post_delta(const vertex_type& vertex,
283  const gather_type& delta) {
284  assert(false); // Not implemented
285  }
286 
295  virtual void clear_gather_cache(const vertex_type& vertex) {
296  assert(false); // Not implemented
297  }
298 
299  }; // end of icontext
300 
301 
302  /* Forward declaratinos */
303  template <typename GLVertexDataType, typename EdgeDataType>
304  struct GraphLabVertexWrapper;
305 
306  template <typename GLVertexDataType, typename EdgeDataType>
307  struct GraphLabEdgeWrapper;
308 
309  /* Fake distributed graph type (this is often hard-coded
310  in GraphLab vertex programs. */
311  template <typename vertex_data, typename edge_data>
314  typedef edge_data edge_data_type;
317  typedef graphchi::vid_t vertex_id_type;
318  };
319 
320 
321  /* GraphChi's version of the ivertex_program */
322  template<typename Graph,
323  typename GatherType, typename MessageType = bool>
325 
326  /* Type definitions */
327  typedef typename Graph::vertex_data_type vertex_data_type;
328  typedef typename Graph::edge_data_type edge_data_type;
329  typedef GatherType gather_type;
330  typedef MessageType message_type;
331 
332  typedef Graph graph_type;
333  typedef typename graphchi::vid_t vertex_id_type;
337 
338  typedef graphlab::edge_dir_type edge_dir_type;
339 
340  virtual void init(icontext_type& context,
341  const vertex_type& vertex,
342  const message_type& msg) { }
343 
348  virtual edge_dir_type gather_edges(icontext_type& context,
349  const vertex_type& vertex) const {
350  return IN_EDGES;
351  }
352 
358  virtual gather_type gather(icontext_type& context,
359  const vertex_type& vertex,
360  edge_type& edge) const {
361  logstream(LOG_FATAL) << "Gather not implemented!" << std::endl;
362  return gather_type();
363  };
364 
369  virtual void apply(icontext_type& context,
370  vertex_type& vertex,
371  const gather_type& total) = 0;
372 
377  virtual edge_dir_type scatter_edges(icontext_type& context,
378  const vertex_type& vertex) const {
379  return OUT_EDGES;
380  }
381 
387  virtual void scatter(icontext_type& context, const vertex_type& vertex,
388  edge_type& edge) const {
389  logstream(LOG_FATAL) << "Scatter not implemented!" << std::endl;
390  };
391  };
392 
393  template <typename GLVertexDataType, typename EdgeDataType>
395  typedef graphchi_vertex<bool, EdgeDataType> VertexType; // Confusing!
396  typedef GLVertexDataType vertex_data_type;
398 
399  graphchi::vid_t vertexId;
400  VertexType * vertex;
401  std::vector<GLVertexDataType> * vertexArray;
402 
403  GraphLabVertexWrapper(graphchi::vid_t vertexId, VertexType * vertex,
404  std::vector<GLVertexDataType> * vertexArray): vertexId(vertexId),
405  vertex(vertex), vertexArray(vertexArray) { }
406 
407  bool operator==(vertex_type& other) const {
408  return vertexId == other.vertexId;
409  }
410 
412  const vertex_data_type& data() const {
413  return (*vertexArray)[vertexId];
414  }
415 
417  vertex_data_type& data() {
418  return (*vertexArray)[vertexId];
419  }
420 
422  size_t num_in_edges() const {
423  if (vertex == NULL) {
424  logstream(LOG_ERROR) << "GraphChi does not support asking neighbor vertices in/out degrees." << std::endl;
425  return 0;
426  }
427  return vertex->num_edges();
428  }
429 
431  size_t num_out_edges() const {
432  if (vertex == NULL) {
433  logstream(LOG_ERROR) << "GraphChi does not support asking neighbor vertices in/out degrees." << std::endl;
434  return 0;
435  }
436  return vertex->num_outedges();
437  }
438 
440  graphchi::vid_t id() const {
441  return vertexId;
442  }
443 
447  graphchi::vid_t local_id() const {
448  return vertexId;
449  }
450 
451  };
452 
453 
454  template <typename GLVertexDataType, typename EdgeDataType>
457  typedef GLVertexDataType vertex_data_type;
458  typedef EdgeDataType edge_data_type;
460 
462  VertexType * vertex;
463  std::vector<GLVertexDataType> * vertexArray;
464  bool is_inedge;
465 
467  std::vector<GLVertexDataType> * vertexArray, bool is_inedge):
468  edge(edge), vertex(vertex), vertexArray(vertexArray), is_inedge(is_inedge) { }
469 
470 
471  public:
472 
484  vertex_type source() const {
485  if (is_inedge) {
486  return GraphLabVertexWrapper<GLVertexDataType, EdgeDataType>(vertex->id(), vertex, vertexArray);
487  } else {
488  return GraphLabVertexWrapper<GLVertexDataType, EdgeDataType>(edge->vertex_id(), NULL, vertexArray);
489  }
490  }
491 
504  vertex_type target() const {
505  if (!is_inedge) {
506  return GraphLabVertexWrapper<GLVertexDataType, EdgeDataType>(vertex->id(), vertex, vertexArray);
507  } else {
508  return GraphLabVertexWrapper<GLVertexDataType, EdgeDataType>(edge->vertex_id(), NULL, vertexArray);
509  }
510  }
511 
515  const edge_data_type& data() const { return const_cast<edge_data_type&>(*edge->data_ptr); }
516 
520  edge_data_type& data() { return *(edge->data_ptr); }
521 
522  }; // end of edge_type
523 
524 
525  template <class GraphLabVertexProgram>
526  struct GraphLabWrapper : public GraphChiProgram<bool, typename GraphLabVertexProgram::edge_data_type> {
527  typedef bool VertexDataType; /* Temporary hack: as the vertices are stored in memory, no need to store on disk. */
528  typedef typename GraphLabVertexProgram::vertex_data_type GLVertexDataType;
529  typedef typename GraphLabVertexProgram::edge_data_type EdgeDataType;
530  typedef typename GraphLabVertexProgram::gather_type gather_type;
532  typedef typename GraphLabVertexProgram::message_type message_type;
533 
534  std::vector<GLVertexDataType> * vertexInmemoryArray;
535 
536  GraphLabWrapper() {
537  vertexInmemoryArray = new std::vector<GLVertexDataType>();
538  }
539 
543  virtual void before_iteration(int iteration, graphchi_context &gcontext) {
544  if (gcontext.iteration == 0) {
545  logstream(LOG_INFO) << "Initialize vertices in memory." << std::endl;
546  vertexInmemoryArray->resize(gcontext.nvertices);
547  }
548  }
549 
553  virtual void after_iteration(int iteration, graphchi_context &gcontext) {
554  }
555 
559  virtual void before_exec_interval(vid_t window_st, vid_t window_en, graphchi_context &gcontext) {
560  }
561 
565  virtual void after_exec_interval(vid_t window_st, vid_t window_en, graphchi_context &gcontext) {
566  }
567 
573 
574  /* Create the vertex program */
575  GraphLabVertexWrapper<GLVertexDataType, EdgeDataType> wrapperVertex(vertex.id(), &vertex, vertexInmemoryArray);
576  GraphLabVertexProgram glVertexProgram;
577 
578  /* Init */
579  glVertexProgram.init(glcontext, wrapperVertex, typename GraphLabVertexProgram::message_type());
580  const GraphLabVertexProgram& const_vprog = glVertexProgram;
581 
582  /* Gather */
583  edge_dir_type gather_direction = const_vprog.gather_edges(glcontext, wrapperVertex);
584  gather_type sum;
585 
586  int gathered = 0;
587  switch (gather_direction) {
588  case ALL_EDGES:
589  case IN_EDGES:
590  for(int i=0; i < vertex.num_inedges(); i++) {
591  GraphLabEdgeWrapper<GLVertexDataType, EdgeDataType> edgeWrapper(vertex.inedge(i), &vertex, vertexInmemoryArray, true);
592  if (gathered > 0) sum += const_vprog.gather(glcontext, wrapperVertex, edgeWrapper);
593  else sum = const_vprog.gather(glcontext, wrapperVertex, edgeWrapper);
594  gathered++;
595  }
596  if (gather_direction != ALL_EDGES)
597  break;
598  case OUT_EDGES:
599  for(int i=0; i < vertex.num_outedges(); i++) {
600  GraphLabEdgeWrapper<GLVertexDataType, EdgeDataType> edgeWrapper(vertex.outedge(i), &vertex, vertexInmemoryArray, false);
601  if (gathered > 0) sum += const_vprog.gather(glcontext, wrapperVertex, edgeWrapper);
602  else sum = const_vprog.gather(glcontext, wrapperVertex, edgeWrapper);
603  gathered++;
604  }
605  break;
606  case NO_EDGES:
607  break;
608  default:
609  assert(false); // Huh?
610  }
611 
612 
613  /* Apply */
614  glVertexProgram.apply(glcontext, wrapperVertex, sum);
615 
616  /* Scatter */
617  edge_dir_type scatter_direction = const_vprog.scatter_edges(glcontext, wrapperVertex);
618 
619  switch(scatter_direction) {
620  case ALL_EDGES:
621  case IN_EDGES:
622  for(int i=0; i < vertex.num_inedges(); i++) {
623  GraphLabEdgeWrapper<GLVertexDataType, EdgeDataType> edgeWrapper(vertex.inedge(i), &vertex, vertexInmemoryArray, true);
624  const_vprog.scatter(glcontext, wrapperVertex, edgeWrapper);
625  }
626  if (scatter_direction != ALL_EDGES)
627  break;
628  case OUT_EDGES:
629  for(int i=0; i < vertex.num_outedges(); i++) {
630  GraphLabEdgeWrapper<GLVertexDataType, EdgeDataType> edgeWrapper(vertex.outedge(i), &vertex, vertexInmemoryArray, false);
631  const_vprog.scatter(glcontext, wrapperVertex, edgeWrapper);
632  }
633  break;
634  case NO_EDGES:
635  break;
636  default:
637  assert(false); // Huh?
638  }
639 
640  /* Done! */
641  }
642 
643 
644  }; // End GraphLabWrapper
645 
646  template <typename GraphLabVertexProgram, typename ReductionType,
647  typename EdgeMapType,
648  typename FinalizerType>
649  struct GraphLabEdgeAggregatorWrapper : public GraphChiProgram<bool, typename GraphLabVertexProgram::edge_data_type> {
650  typedef bool VertexDataType; /* Temporary hack: as the vertices are stored in memory, no need to store on disk. */
651  typedef typename GraphLabVertexProgram::vertex_data_type GLVertexDataType;
652  typedef typename GraphLabVertexProgram::edge_data_type EdgeDataType;
653  typedef typename GraphLabVertexProgram::edge_type edge_type;
654  typedef typename GraphLabVertexProgram::gather_type gather_type;
656  typedef typename GraphLabVertexProgram::message_type message_type;
657 
658  mutex m;
659  std::vector<ReductionType> localaggr;
660  ReductionType aggr;
661  std::vector<GLVertexDataType> * vertexInmemoryArray;
662  EdgeMapType map_function;
663  FinalizerType finalize_function;
664 
665  GraphLabEdgeAggregatorWrapper(EdgeMapType map_function,
666  FinalizerType finalize_function,
667  std::vector<typename GraphLabVertexProgram::vertex_data_type> * vertices) : map_function(map_function),
668  finalize_function(finalize_function) {
669  vertexInmemoryArray = vertices;
670  }
671 
675  virtual void before_iteration(int iteration, graphchi_context &gcontext) {
676  aggr = ReductionType();
677  localaggr.resize(gcontext.execthreads);
678  }
679 
683  virtual void after_iteration(int iteration, graphchi_context &gcontext) {
684  logstream(LOG_INFO) << "Going to run edge-aggregator finalize." << std::endl;
685 
686  for(int i=0; i < (int)localaggr.size(); i++) {
687  aggr += localaggr[i];
688  }
689 
691  finalize_function(glcontext, aggr);
692  }
693 
697  virtual void before_exec_interval(vid_t window_st, vid_t window_en, graphchi_context &gcontext) {
698  }
699 
703  virtual void after_exec_interval(vid_t window_st, vid_t window_en, graphchi_context &gcontext) {
704  }
705 
711  ReductionType a;
712  for(int i=0; i < vertex.num_edges(); i++) {
713  const GraphLabEdgeWrapper<GLVertexDataType, EdgeDataType> edgeWrapper(vertex.edge(i), &vertex, vertexInmemoryArray, true);
714  ReductionType mapped = map_function(glcontext, edgeWrapper);
715  a += mapped;
716  }
717  localaggr[omp_get_thread_num()] += a;
718  }
719  }; // End edge-aggregator wrapper
720 
721 
722 
727  namespace messages {
728 
733  double value;
734  sum_priority(const double value = 0) : value(value) { }
735  double priority() const { return value; }
736  sum_priority& operator+=(const sum_priority& other) {
737  value += other.value;
738  return *this;
739  }
740  }; // end of sum_priority message
741 
746  double value;
747  max_priority(const double value = 0) : value(value) { }
748  double priority() const { return value; }
749  max_priority& operator+=(const max_priority& other) {
750  value = std::max(value, other.value);
751  return *this;
752  }
753  }; // end of max_priority message
754 
755 
756  }; // end of messages namespace
757 
758 
759 
760 
761 }; // End namespace graphlab
762 
763 template <typename GraphLabVertexProgram>
764  std::vector<typename GraphLabVertexProgram::vertex_data_type> *
765  run_graphlab_vertexprogram(std::string base_filename, int nshards, int niters, bool scheduler, metrics & _m,
766  bool modifies_inedges=true, bool modifies_outedges=true) {
768  GLWrapper wrapperProgram;
769  graphchi_engine<bool, typename GLWrapper::EdgeDataType> engine(base_filename, nshards, scheduler, _m);
770  engine.set_modifies_inedges(modifies_inedges);
771  engine.set_modifies_outedges(modifies_outedges);
772  engine.run(wrapperProgram, niters);
773  return wrapperProgram.vertexInmemoryArray;
774 }
775 
776 template <typename GraphLabVertexProgram, typename ReductionType,
777  typename EdgeMapType,
778  typename FinalizerType>
779 ReductionType run_graphlab_edge_aggregator(std::string base_filename, int nshards,
780  EdgeMapType map_function,
781  FinalizerType finalize_function, std::vector<typename GraphLabVertexProgram::vertex_data_type> * vertices, metrics & _m) {
783  logstream(LOG_INFO) << "Starting edge aggregator." << std::endl;
784  GLEdgeAggrWrapper glAggregator(map_function, finalize_function, vertices);
785  graphchi_engine<bool, typename GLEdgeAggrWrapper::EdgeDataType> engine(base_filename, nshards, true, _m);
786  engine.set_modifies_inedges(false);
787  engine.set_modifies_outedges(false);
788  engine.run(glAggregator, 1);
789  return glAggregator.aggr;
790 }
791 
792 
793 #endif
794