GraphChi  0.1
 All Classes Namespaces Files Functions Variables Typedefs Enumerations Macros
graphchi_engine.hpp
Go to the documentation of this file.
1 
29 #ifndef DEF_GRAPHCHI_GRAPHCHI_ENGINE
30 #define DEF_GRAPHCHI_GRAPHCHI_ENGINE
31 
32 
33 #include <iostream>
34 #include <fstream>
35 #include <sstream>
36 #include <cstdio>
37 #include <fcntl.h>
38 #include <unistd.h>
39 #include <assert.h>
40 #include <omp.h>
41 #include <vector>
42 #include <sys/time.h>
43 
44 #include "api/chifilenames.hpp"
45 #include "api/graph_objects.hpp"
46 #include "api/graphchi_context.hpp"
47 #include "api/graphchi_program.hpp"
51 #include "io/stripedio.hpp"
52 #include "logger/logger.hpp"
53 #include "metrics/metrics.hpp"
54 #include "shards/memoryshard.hpp"
55 #include "shards/slidingshard.hpp"
56 #include "util/pthread_tools.hpp"
57 
58 
59 namespace graphchi {
60 
61  template <typename VertexDataType, typename EdgeDataType,
62  typename svertex_t = graphchi_vertex<VertexDataType, EdgeDataType> >
63 
65  public:
68 
69  protected:
70  std::string base_filename;
71  int nshards;
72 
73  /* IO manager */
74  stripedio * iomgr;
75 
76  /* Shards */
77  std::vector<slidingshard_t *> sliding_shards;
78  memshard_t * memoryshard;
79  std::vector<std::pair<vid_t, vid_t> > intervals;
80 
81  /* Auxilliary data handlers */
82  degree_data * degree_handler;
83  vertex_data_store<VertexDataType> * vertex_data_handler;
84 
85  /* Computational context */
86  graphchi_context chicontext;
87 
88  /* Scheduler */
89  bitset_scheduler * scheduler;
90 
91  /* Configuration */
92  bool modifies_outedges;
93  bool modifies_inedges;
94  bool only_adjacency;
95  bool use_selective_scheduling;
96  bool enable_deterministic_parallelism;
97  bool store_inedges;
98 
99  size_t blocksize;
100  int membudget_mb;
101  int load_threads;
102  int exec_threads;
103 
104  /* State */
105  vid_t sub_interval_st;
106  vid_t sub_interval_en;
107  int iter;
108  int niters;
109  int exec_interval;
110  size_t nupdates;
111  size_t nedges;
112  size_t work; // work is the number of edges processed
113 
114  /* Metrics */
115  metrics &m;
116 
117  void print_config() {
118  logstream(LOG_INFO) << "Engine configuration: " << std::endl;
119  logstream(LOG_INFO) << " exec_threads = " << exec_threads << std::endl;
120  logstream(LOG_INFO) << " load_threads = " << load_threads << std::endl;
121  logstream(LOG_INFO) << " membudget_mb = " << membudget_mb << std::endl;
122  logstream(LOG_INFO) << " blocksize = " << blocksize << std::endl;
123  logstream(LOG_INFO) << " scheduler = " << use_selective_scheduling << std::endl;
124  }
125 
126  public:
127 
134  graphchi_engine(std::string _base_filename, int _nshards, bool _selective_scheduling, metrics &_m) : base_filename(_base_filename), nshards(_nshards), use_selective_scheduling(_selective_scheduling), m(_m) {
135  /* Initialize IO */
136  iomgr = new stripedio(m);
137  if (disable_preloading()) {
138  iomgr->set_disable_preloading(true);
139  }
140  logstream(LOG_INFO) << "Initializing graphchi_engine. This engine expects " << sizeof(EdgeDataType)
141  << "-byte edge data. " << std::endl;
142 
143  /* If number of shards is unspecified - discover */
144  if (nshards < 1) {
145  nshards = get_option_int("nshards", 0);
146  if (nshards < 1) {
147  logstream(LOG_WARNING) << "Number of shards was not specified (command-line argument 'nshards'). Trying to detect. " << std::endl;
148  nshards = discover_shard_num();
149  }
150  }
151 
152  /* Initialize a plenty of fields */
153  memoryshard = NULL;
154  modifies_outedges = true;
155  modifies_inedges = true;
156  only_adjacency = false;
157  blocksize = get_option_long("blocksize", 1024 * 1024);
158  membudget_mb = get_option_int("membudget_mb", 1024);
159  nupdates = 0;
160  iter = 0;
161  work = 0;
162  nedges = 0;
163  scheduler = NULL;
164  store_inedges = true;
165  enable_deterministic_parallelism = true;
166  load_threads = get_option_int("loadthreads", 2);
167  exec_threads = get_option_int("execthreads", omp_get_max_threads());
168 
169  /* Load graph shard interval information */
171 
172  _m.set("engine", "default");
173  }
174 
175  virtual ~graphchi_engine() {
176  if (degree_handler != NULL) delete degree_handler;
177  if (vertex_data_handler != NULL) delete vertex_data_handler;
178  if (memoryshard != NULL) {
179  delete memoryshard;
180  memoryshard = NULL;
181  }
182  for(int i=0; i < (int)sliding_shards.size(); i++) {
183  if (sliding_shards[i] != NULL) {
184  delete sliding_shards[i];
185  }
186  sliding_shards[i] = NULL;
187  }
188  degree_handler = NULL;
189  vertex_data_handler = NULL;
190  delete iomgr;
191  }
192 
193 
194  protected:
195 
196  virtual degree_data * create_degree_handler() {
197  return new degree_data(base_filename, iomgr);
198  }
199 
200  virtual bool disable_preloading() {
201  return false;
202  }
203 
209  int _nshards = find_shards<EdgeDataType>(base_filename);
210  if (_nshards == 0) {
211  logstream(LOG_ERROR) << "Could not find suitable shards - maybe you need to run sharder to create them?" << std::endl;
212  logstream(LOG_ERROR) << "You need to create the shards with edge data-type of size " << sizeof(EdgeDataType) << " bytes." << std::endl;
213  logstream(LOG_ERROR) << "To specify the number of shards, use command-line parameter 'nshards'" << std::endl;
214  assert(0);
215  }
216  return _nshards;
217  }
218 
219 
220  virtual void initialize_sliding_shards() {
221  assert(sliding_shards.size() == 0);
222  for(int p=0; p < nshards; p++) {
223  std::string edata_filename = filename_shard_edata<EdgeDataType>(base_filename, p, nshards);
224  std::string adj_filename = filename_shard_adj(base_filename, p, nshards);
225 
226  /* Let the IO manager know that we will be reading these files, and
227  it should decide whether to preload them or not.
228  */
229  iomgr->allow_preloading(edata_filename);
230  iomgr->allow_preloading(adj_filename);
231 
232  sliding_shards.push_back(
233  new slidingshard_t(iomgr, edata_filename,
234  adj_filename,
235  intervals[p].first,
236  intervals[p].second,
237  blocksize,
238  m,
239  !modifies_outedges,
240  only_adjacency));
241  if (!only_adjacency)
242  nedges += sliding_shards[sliding_shards.size() - 1]->num_edges();
243  }
244 
245  }
246 
247  virtual void initialize_scheduler() {
248  if (use_selective_scheduling) {
249  scheduler = new bitset_scheduler((int) num_vertices());
250  scheduler->add_task_to_all();
251  } else {
252  scheduler = NULL;
253  }
254  }
255 
256 
257 
261  virtual vid_t determine_next_window(vid_t iinterval, vid_t fromvid, vid_t maxvid, size_t membudget) {
262  /* Load degrees */
263  degree_handler->load(fromvid, maxvid);
264 
265  size_t memreq = 0;
266  int max_interval = maxvid - fromvid;
267  for(int i=0; i < max_interval; i++) {
268  degree deg = degree_handler->get_degree(fromvid + i);
269  int inc = deg.indegree;
270  int outc = deg.outdegree;
271 
272  // Raw data and object cost included
273  memreq += sizeof(svertex_t) + (sizeof(EdgeDataType) + sizeof(vid_t) + sizeof(graphchi_edge<EdgeDataType>))*(outc + inc);
274  if (memreq > membudget) {
275  return fromvid + i - 1; // Previous was enough
276  }
277  }
278  return maxvid;
279  }
280 
285  size_t num_edges_subinterval(vid_t st, vid_t en) {
286  size_t num_edges = 0;
287  int nvertices = en - st + 1;
288  if (scheduler != NULL) {
289  for(int i=0; i < nvertices; i++) {
290  bool is_sched = scheduler->is_scheduled(st + i);
291  if (is_sched) {
292  degree d = degree_handler->get_degree(st + i);
293  num_edges += d.indegree * store_inedges + d.outdegree;
294  }
295  }
296  } else {
297  for(int i=0; i < nvertices; i++) {
298  degree d = degree_handler->get_degree(st + i);
299  num_edges += d.indegree * store_inedges + d.outdegree;
300  }
301  }
302  return num_edges;
303  }
304 
305  virtual void load_before_updates(std::vector<svertex_t> &vertices) {
306  omp_set_num_threads(load_threads);
307 #pragma omp parallel for schedule(dynamic, 1)
308  for(int p=-1; p < nshards; p++) {
309  if (p==(-1)) {
310  /* Load memory shard */
311  if (!memoryshard->loaded()) {
312  memoryshard->load();
313  }
314 
315  /* Load vertex edges from memory shard */
316  memoryshard->load_vertices(sub_interval_st, sub_interval_en, vertices);
317 
318  /* Load vertices */
319  vertex_data_handler->load(sub_interval_st, sub_interval_en);
320  } else {
321  /* Load edges from a sliding shard */
322  if (p != exec_interval) {
323  sliding_shards[p]->read_next_vertices((int) vertices.size(), sub_interval_st, vertices,
324  scheduler != NULL && chicontext.iteration == 0);
325 
326  }
327  }
328  }
329 
330  /* Wait for all reads to complete */
331  iomgr->wait_for_reads();
332  }
333 
334  void exec_updates(GraphChiProgram<VertexDataType, EdgeDataType, svertex_t> &userprogram,
335  std::vector<svertex_t> &vertices) {
336  metrics_entry me = m.start_time();
337  size_t nvertices = vertices.size();
338  if (!enable_deterministic_parallelism) {
339  for(int i=0; i < (int)nvertices; i++) vertices[i].parallel_safe = true;
340  }
341 
342  omp_set_num_threads(exec_threads);
343 
344 #pragma omp parallel sections
345  {
346 #pragma omp section
347  {
348 #pragma omp parallel for schedule(dynamic)
349  for(int vid=sub_interval_st; vid <= (int)sub_interval_en; vid++) {
350  svertex_t & v = vertices[vid - sub_interval_st];
351 
352  if (exec_threads == 1 || v.parallel_safe) {
353  v.dataptr = vertex_data_handler->vertex_data_ptr(vid);
354  if (v.scheduled)
355  userprogram.update(v, chicontext);
356  }
357  }
358  }
359 #pragma omp section
360  {
361  if (exec_threads > 1 && enable_deterministic_parallelism) {
362  int nonsafe_count = 0;
363  for(int vid=sub_interval_st; vid <= (int)sub_interval_en; vid++) {
364  svertex_t & v = vertices[vid - sub_interval_st];
365  if (!v.parallel_safe && v.scheduled) {
366  v.dataptr = vertex_data_handler->vertex_data_ptr(vid);
367  userprogram.update(v, chicontext);
368  nonsafe_count++;
369  }
370  }
371 
372  m.add("serialized-updates", nonsafe_count);
373  }
374  }
375  }
376  m.stop_time(me, "execute-updates");
377  }
378 
379  virtual void init_vertices(std::vector<svertex_t> &vertices, graphchi_edge<EdgeDataType> * &edata) {
380  size_t nvertices = vertices.size();
381 
382  /* Compute number of edges */
383  size_t num_edges = num_edges_subinterval(sub_interval_st, sub_interval_en);
384 
385  /* Allocate edge buffer */
386  edata = (graphchi_edge<EdgeDataType>*) malloc(num_edges * sizeof(graphchi_edge<EdgeDataType>));
387 
388  /* Assign vertex edge array pointers */
389  int ecounter = 0;
390  for(int i=0; i < (int)nvertices; i++) {
391  degree d = degree_handler->get_degree(sub_interval_st + i);
392  int inc = d.indegree;
393  int outc = d.outdegree;
394  vertices[i] = svertex_t(sub_interval_st + i, &edata[ecounter],
395  &edata[ecounter + inc * store_inedges], inc, outc);
396  if (scheduler != NULL) {
397  bool is_sched = scheduler->is_scheduled(sub_interval_st + i);
398  if (is_sched) {
399  vertices[i].scheduled = true;
400  nupdates++;
401  ecounter += inc * store_inedges + outc;
402  }
403  } else {
404  nupdates++;
405  vertices[i].scheduled = true;
406  ecounter += inc * store_inedges + outc;
407  }
408  }
409  work += ecounter;
410  }
411 
412 
413  void save_vertices(std::vector<svertex_t> &vertices) {
414  size_t nvertices = vertices.size();
415  bool modified_any_vertex = false;
416  for(int i=0; i < (int)nvertices; i++) {
417  if (vertices[i].modified) {
418  modified_any_vertex = true;
419  break;
420  }
421  }
422  if (modified_any_vertex) {
423  vertex_data_handler->save();
424  }
425  }
426 
427  virtual void load_after_updates(std::vector<svertex_t> &vertices) {
428  // Do nothing.
429  }
430 
431  virtual void write_delta_log() {
432  // Write delta log
433  std::string deltafname = iomgr->multiplexprefix(0) + base_filename + ".deltalog";
434  FILE * df = fopen(deltafname.c_str(), (chicontext.iteration == 0 ? "w" : "a"));
435  fprintf(df, "%d,%lu,%lu,%lf\n", chicontext.iteration, nupdates, work, chicontext.get_delta());
436  fclose(df);
437  }
438 
439  public:
440 
441  virtual std::pair<vid_t, vid_t> get_interval(int i) {
442  return intervals[i];
443  }
444 
445  vid_t get_interval_start(int i) {
446  return get_interval(i).first;
447  }
448 
449  vid_t get_interval_end(int i) {
450  return get_interval(i).second;
451  }
452 
453  virtual size_t num_vertices() {
454  return 1 + intervals[nshards - 1].second;
455  }
456 
457  graphchi_context &get_context() {
458  return chicontext;
459  }
460 
461  size_t num_updates() {
462  return nupdates;
463  }
464 
468  virtual size_t num_edges_safe() {
469  return num_edges();
470  }
471 
472  virtual size_t num_buffered_edges() {
473  return 0;
474  }
475 
479  virtual size_t num_edges() {
480  if (only_adjacency) {
481  // TODO: fix.
482  logstream(LOG_ERROR) << "Asked number of edges, but engine was run without edge-data." << std::endl;
483  return 0;
484  }
485  return nedges;
486  }
487 
492  // TODO: support for a minimum fraction of scheduled vertices
493  bool is_any_vertex_scheduled(vid_t st, vid_t en) {
494  if (scheduler == NULL) return true;
495  for(vid_t v=st; v<=en; v++) {
496  if (scheduler->is_scheduled(v)) {
497  return true;
498  }
499  }
500  return false;
501  }
502 
503  virtual void initialize_iter() {
504  // Do nothing
505  }
506 
507  virtual void initialize_before_run() {
508  // Do nothing
509  }
510 
511  virtual memshard_t * create_memshard(vid_t interval_st, vid_t interval_en) {
512  return new memshard_t(this->iomgr,
513  filename_shard_edata<EdgeDataType>(base_filename, exec_interval, nshards),
514  filename_shard_adj(base_filename, exec_interval, nshards),
515  interval_st,
516  interval_en,
517  m);
518  }
519 
526  m.start_time("runtime");
527  degree_handler = create_degree_handler();
528 
529  niters = _niters;
530  logstream(LOG_INFO) << "GraphChi starting" << std::endl;
531  logstream(LOG_INFO) << "Licensed under the Apache License 2.0" << std::endl;
532  logstream(LOG_INFO) << "Copyright Aapo Kyrola et al., Carnegie Mellon University (2012)" << std::endl;
533 
534 
535  vertex_data_handler = new vertex_data_store<VertexDataType>(base_filename, num_vertices(), iomgr);
536  initialize_before_run();
537 
538 
539  /* Setup */
540  initialize_sliding_shards();
541  initialize_scheduler();
542  omp_set_nested(1);
543 
544  /* Print configuration */
545  print_config();
546 
547 
548 
549  /* Main loop */
550  for(iter=0; iter < niters; iter++) {
551  logstream(LOG_INFO) << "Start iteration: " << iter << std::endl;
552 
553  initialize_iter();
554 
555  /* Check vertex data file has the right size (number of vertices may change) */
556  vertex_data_handler->check_size(num_vertices());
557 
558  /* Keep the context object updated */
559  chicontext.filename = base_filename;
560  chicontext.iteration = iter;
561  chicontext.num_iterations = niters;
562  chicontext.nvertices = num_vertices();
563  chicontext.scheduler = scheduler;
564  chicontext.execthreads = exec_threads;
565  chicontext.reset_deltas(exec_threads);
566 
567  /* Call iteration-begin event handler */
568  userprogram.before_iteration(iter, chicontext);
569 
570  /* Check scheduler. If no scheduled tasks, terminate. */
571  if (use_selective_scheduling) {
572  if (scheduler != NULL) {
573  if (!scheduler->has_new_tasks) {
574  logstream(LOG_INFO) << "No new tasks to run!" << std::endl;
575  break;
576  }
577  scheduler->has_new_tasks = false; // Kind of misleading since scheduler may still have tasks - but no new tasks.
578  }
579  }
580 
581  /* Interval loop */
582  for(exec_interval=0; exec_interval < nshards; ++exec_interval) {
583  /* Determine interval limits */
584  vid_t interval_st = get_interval_start(exec_interval);
585  vid_t interval_en = get_interval_end(exec_interval);
586 
587  userprogram.before_exec_interval(interval_st, interval_en, chicontext);
588 
589  /* Flush stream shard for the exec interval */
590  sliding_shards[exec_interval]->flush();
591  iomgr->wait_for_writes(); // Actually we would need to only wait for writes of given shard. TODO.
592 
593  /* Initialize memory shard */
594  if (memoryshard != NULL) delete memoryshard;
595  memoryshard = create_memshard(interval_st, interval_en);
596  memoryshard->only_adjacency = only_adjacency;
597 
598 
599  sub_interval_st = interval_st;
600  logstream(LOG_INFO) << chicontext.runtime() << "s: Starting: "
601  << sub_interval_st << " -- " << interval_en << std::endl;
602 
603  while (sub_interval_st < interval_en) {
604  /* Determine the sub interval */
605  sub_interval_en = determine_next_window(exec_interval,
606  sub_interval_st,
607  interval_en,
608  membudget_mb * 1024 * 1024);
609  assert(sub_interval_en > sub_interval_st);
610 
611  logstream(LOG_INFO) << "Iteration " << iter << "/" << (niters - 1) << ", subinterval: " << sub_interval_st << " - " << sub_interval_en << std::endl;
612 
613  bool any_vertex_scheduled = is_any_vertex_scheduled(sub_interval_st, sub_interval_en);
614  if (!any_vertex_scheduled) {
615  logstream(LOG_INFO) << "No vertices scheduled, skip." << std::endl;
616  sub_interval_st = sub_interval_en + 1;
617  continue;
618  }
619 
620  /* Initialize vertices */
621  int nvertices = sub_interval_en - sub_interval_st + 1;
622  graphchi_edge<EdgeDataType> * edata = NULL;
623  std::vector<svertex_t> vertices(nvertices, svertex_t());
624  init_vertices(vertices, edata);
625 
626  /* Now clear scheduler bits for the interval */
627  if (scheduler != NULL)
628  scheduler->remove_tasks(sub_interval_st, sub_interval_en);
629 
630  /* Load data */
631  load_before_updates(vertices);
632 
633 
634  logstream(LOG_INFO) << "Start updates" << std::endl;
635  /* Execute updates */
636  exec_updates(userprogram, vertices);
637  logstream(LOG_INFO) << "Finished updates" << std::endl;
638 
639  /* Load phase after updates (used by the functional engine) */
640  load_after_updates(vertices);
641 
642  /* Save vertices */
643  save_vertices(vertices);
644 
645  sub_interval_st = sub_interval_en + 1;
646 
647  /* Delete edge buffer. TODO: reuse. */
648  if (edata != NULL) {
649  delete edata;
650  edata = NULL;
651  }
652  } // while subintervals
653 
654  if (memoryshard->loaded()) {
655  logstream(LOG_INFO) << "Commit memshard" << std::endl;
656 
657  memoryshard->commit(modifies_inedges);
658  sliding_shards[exec_interval]->set_offset(memoryshard->offset_for_stream_cont(), memoryshard->offset_vid_for_stream_cont(),
659  memoryshard->edata_ptr_for_stream_cont());
660 
661  delete memoryshard;
662  memoryshard = NULL;
663  }
664 
665  userprogram.after_exec_interval(interval_st, interval_en, chicontext);
666  } // For exec_interval
667 
668  userprogram.after_iteration(iter, chicontext);
669 
670 
671  /* Move the sliding shard of the current interval to correct position and flush
672  writes of all shards for next iteration. */
673  for(int p=0; p<nshards; p++) {
674  sliding_shards[p]->flush();
675  sliding_shards[p]->set_offset(0, 0, 0);
676  }
677  iomgr->wait_for_writes();
678 
679  /* Write progress log */
680  write_delta_log();
681 
682  /* Check if user has defined a last iteration */
683  if (chicontext.last_iteration >= 0) {
684  niters = chicontext.last_iteration + 1;
685  logstream(LOG_DEBUG) << "Last iteration is now: " << (niters-1) << std::endl;
686  }
687  iteration_finished();
688  } // Iterations
689 
690  // Commit preloaded shards
691  iomgr->commit_preloaded();
692 
693  m.stop_time("runtime");
694 
695  m.set("updates", nupdates);
696  m.set("work", work);
697  m.set("nvertices", num_vertices());
698  m.set("execthreads", (size_t)exec_threads);
699  m.set("loadthreads", (size_t)load_threads);
700  m.set("scheduler", (size_t)use_selective_scheduling);
701 
702  // Stop HTTP admin
703  }
704 
705  virtual void iteration_finished() {
706  // Do nothing
707  }
708 
709  stripedio * get_iomanager() {
710  return iomgr;
711  }
712 
713  virtual void set_modifies_inedges(bool b) {
714  modifies_inedges = b;
715  }
716 
717  virtual void set_modifies_outedges(bool b) {
718  modifies_outedges = b;
719  }
720 
721  virtual void set_only_adjacency(bool b) {
722  only_adjacency = b;
723  }
724 
730  void set_blocksize(size_t blocksize_in_bytes) {
731  blocksize = blocksize_in_bytes;
732  }
733 
739  void set_membudget_mb(int mbs) {
740  membudget_mb = mbs;
741  }
742 
743 
744  void set_load_threads(int lt) {
745  load_threads = lt;
746  }
747 
748  void set_exec_threads(int et) {
749  exec_threads = et;
750  }
751 
757  enable_deterministic_parallelism = b;
758  }
759 
760  protected:
761 
765  virtual void load_vertex_intervals() {
766  char partstr[128];
767  sprintf(partstr, ".%d", nshards);
768 
769  std::string intervalsFilename = filename_intervals(base_filename, nshards);
770  std::ifstream intervalsF(intervalsFilename.c_str());
771 
772  if (!intervalsF.good()) {
773  logstream(LOG_ERROR) << "Could not load intervals-file: " << intervalsFilename << std::endl;
774  }
775  assert(intervalsF.good());
776 
777  intervals.clear();
778 
779  vid_t st=0, en;
780  for(int i=0; i < nshards; i++) {
781  assert(!intervalsF.eof());
782  intervalsF >> en;
783  intervals.push_back(std::pair<vid_t,vid_t>(st, en));
784  st = en + 1;
785  }
786  for(int i=0; i < nshards; i++) {
787  logstream(LOG_INFO) << "shard: " << intervals[i].first << " - " << intervals[i].second << std::endl;
788  }
789 
790  }
791 
792  protected:
793  mutex httplock;
794  std::map<std::string, std::string> json_params;
795 
796  public:
797 
802  void set_json(std::string key, std::string value) {
803  httplock.lock();
804  json_params[key] = value;
805  httplock.unlock();
806  }
807 
808  template <typename T>
809  void set_json(std::string key, T val) {
810  std::stringstream ss;
811  ss << val;
812  set_json(key, ss.str());
813  }
814 
815  std::string get_info_json() {
816  std::stringstream json;
817  json << "{";
818  json << "\"file\" : \"" << base_filename << "\",\n";
819  json << "\"numOfShards\": " << nshards << ",\n";
820  json << "\"iteration\": " << chicontext.iteration << ",\n";
821  json << "\"numIterations\": " << chicontext.num_iterations << ",\n";
822  json << "\"runTime\": " << chicontext.runtime() << ",\n";
823 
824  json << "\"updates\": " << nupdates << ",\n";
825  json << "\"nvertices\": " << chicontext.nvertices << ",\n";
826  json << "\"interval\":" << exec_interval << ",\n";
827  json << "\"windowStart\":" << sub_interval_st << ",";
828  json << "\"windowEnd\": " << sub_interval_en << ",";
829  json << "\"shards\": [";
830 
831  for(int p=0; p < (int)nshards; p++) {
832  if (p>0) json << ",";
833 
834  json << "{";
835  json << "\"p\": " << p << ", ";
836  json << sliding_shards[p]->get_info_json();
837  json << "}";
838  }
839 
840  json << "]";
841  json << "}";
842  return json.str();
843  }
844 
845  };
846 
847 
848 };
849 
850 
851 
852 #endif
853 
854