31 #ifndef DEF_STRIPEDIO_HPP
32 #define DEF_STRIPEDIO_HPP
48 #include "util/synchronized_queue.hpp"
56 static size_t get_filesize(std::string filename);
64 std::vector<int> readdescs;
65 std::vector<int> writedescs;
72 enum BLOCK_ACTION { READ, WRITE };
78 refcountptr(
char * ptr,
int count) : ptr(ptr), count(count) {}
96 iotask() : action(READ), fd(0), ptr(NULL), length(0), offset(0), ptroffset(0), free_after(
false), iomgr(NULL) {}
97 iotask(
stripedio * iomgr, BLOCK_ACTION act,
int fd,
refcountptr * ptr,
size_t length,
size_t offset,
size_t ptroffset,
bool free_after=
false) :
98 action(act), fd(fd), ptr(ptr),length(length), offset(offset), ptroffset(ptroffset), free_after(free_after), iomgr(iomgr) {}
108 volatile int pending_writes;
109 volatile int pending_reads;
114 static void * io_thread_loop(
void * _info);
120 stripe_chunk(
int mplex_thread,
size_t offset,
size_t len) : mplex_thread(mplex_thread), offset(offset), len(len) {}
128 volatile size_t curpos;
135 std::string filename;
142 static void * stream_read_loop(
void * _info);
147 std::vector<io_descriptor *> sessions;
152 std::string multiplex_root;
153 bool disable_preloading;
155 std::vector< synchronized_queue<iotask> > mplex_readtasks;
156 std::vector< synchronized_queue<iotask> > mplex_writetasks;
157 std::vector< synchronized_queue<iotask> > mplex_priotasks;
158 std::vector< pthread_t > threads;
159 std::vector< thrinfo * > thread_infos;
163 std::vector<pinned_file *> preloaded_files;
165 size_t preloaded_bytes;
166 size_t max_preload_bytes;
173 disable_preloading =
false;
174 blocksize = get_option_int(
"io.blocksize", 1024 * 1024);
175 stripesize = get_option_int(
"io.stripesize", blocksize/2);
177 max_preload_bytes = 1024 * 1024 * get_option_long(
"preload.max_megabytes", 0);
179 if (max_preload_bytes > 0) {
180 logstream(
LOG_INFO) <<
"Preloading maximum " << max_preload_bytes <<
" bytes." << std::endl;
183 multiplex = get_option_int(
"multiplex", 1);
185 multiplex_root = get_option_string(
"multiplex_root",
"<not-set>");
188 stripesize = 1024*1024*1024;
190 m.set(
"stripesize", (
size_t)stripesize);
193 niothreads = get_option_int(
"niothreads", 1);
194 m.set(
"niothreads", (
size_t)niothreads);
197 for(
int i=0; i<multiplex * niothreads; i++) {
204 for(
int i=0; i < multiplex; i++) {
205 for(
int j=0; j < niothreads; j++) {
207 cthreadinfo->commitqueue = &mplex_writetasks[k];
208 cthreadinfo->readqueue = &mplex_readtasks[k];
209 cthreadinfo->prioqueue = &mplex_priotasks[k];
210 cthreadinfo->running =
true;
211 cthreadinfo->pending_writes = 0;
212 cthreadinfo->pending_reads = 0;
213 cthreadinfo->mplex = i;
215 thread_infos.push_back(cthreadinfo);
218 int ret = pthread_create(&iothread, NULL, io_thread_loop, cthreadinfo);
219 threads.push_back(iothread);
227 int mplex = (int) thread_infos.size();
229 for(
int i=0; i<mplex; i++) {
230 thread_infos[i]->running=
false;
232 size_t nthreads = threads.size();
233 for(
unsigned int i=0; i<nthreads; i++) {
234 pthread_join(threads[i], NULL);
236 for(
int i=0; i<mplex; i++) {
237 delete thread_infos[i];
240 for(
int j=0; j<(int)sessions.size(); j++) {
241 if (sessions[j] != NULL) {
247 for(std::vector<pinned_file *>::iterator it=preloaded_files.begin();
248 it != preloaded_files.end(); ++it) {
250 delete preloaded->data;
255 void set_disable_preloading(
bool b) {
256 disable_preloading = b;
257 if (b) logstream(
LOG_INFO) <<
"Disabled preloading." << std::endl;
264 void print_session(
int session) {
265 for(
int i=0; i<multiplex; i++) {
266 std::cout <<
"multiplex: " << multiplex << std::endl;
267 std::cout <<
"Read desc: " << sessions[session]->readdescs[i] << std::endl;
270 for(
int i=0; i<(int)sessions[session]->writedescs.size(); i++) {
271 std::cout <<
"multiplex: " << multiplex << std::endl;
272 std::cout <<
"Read desc: " << sessions[session]->writedescs[i] << std::endl;
279 int hash(std::string filename) {
280 const char * cstr = filename.c_str();
282 int l = (int) strlen(cstr);
283 for(
int i=0; i<l; i++) {
284 hash = 31*hash + cstr[i];
286 return std::abs(hash);
289 int open_session(std::string filename,
bool readonly=
false) {
292 int session_id = (int) sessions.size();
295 iodesc->pinned_to_memory = is_preloaded(filename);
296 iodesc->start_mplex = hash(filename) % multiplex;
297 sessions.push_back(iodesc);
300 if (NULL != iodesc->pinned_to_memory) {
301 logstream(
LOG_INFO) <<
"Opened preloaded session: " << filename << std::endl;
305 for(
int i=0; i<multiplex; i++) {
306 std::string fname = multiplexprefix(i) + filename;
307 for(
int j=0; j<niothreads+(multiplex == 1 ? 1 : 0); j++) {
308 int rddesc = open(fname.c_str(), O_RDONLY);
309 if (rddesc < 0) logstream(
LOG_ERROR) <<
"Could not open: " << fname <<
" session: " << session_id
310 <<
" error: " << strerror(errno) << std::endl;
312 iodesc->readdescs.push_back(rddesc);
315 fcntl(rddesc, F_NOCACHE, 1);
318 int wrdesc = open(fname.c_str(), O_RDWR);
320 if (wrdesc < 0) logstream(
LOG_ERROR) <<
"Could not open for writing: " << fname <<
" session: " << session_id
321 <<
" error: " << strerror(errno) << std::endl;
324 fcntl(wrdesc, F_NOCACHE, 1);
327 iodesc->writedescs.push_back(wrdesc);
331 iodesc->filename = filename;
332 if (iodesc->writedescs.size() > 0) {
333 logstream(
LOG_INFO) <<
"Opened write-session: " << session_id <<
"(" << iodesc->writedescs[0] <<
") for " << filename << std::endl;
335 logstream(
LOG_INFO) <<
"Opened read-session: " << session_id <<
"(" << iodesc->readdescs[0] <<
") for " << filename << std::endl;
341 void close_session(
int session) {
348 wasopen = iodesc->open;
349 iodesc->open =
false;
352 for(std::vector<int>::iterator it=iodesc->readdescs.begin(); it!=iodesc->readdescs.end(); ++it) {
355 for(std::vector<int>::iterator it=iodesc->writedescs.begin(); it!=iodesc->writedescs.end(); ++it) {
361 int mplex_for_offset(
int session,
size_t off) {
362 return ((
int) (off / stripesize) + sessions[session]->start_mplex) % multiplex;
366 std::vector< stripe_chunk > stripe_offsets(
int session,
size_t nbytes,
size_t off) {
367 size_t end = off+nbytes;
370 std::vector<stripe_chunk> stripelist;
372 size_t blockoff = idx%stripesize;
373 size_t blocklen = std::min(stripesize-blockoff, end-idx);
375 int mplex_thread = (int) mplex_for_offset(session, idx) * niothreads + (int) (random() % niothreads);
376 stripelist.push_back(
stripe_chunk(mplex_thread, bufoff, blocklen));
384 template <
typename T>
385 void preada_async(
int session, T * tbuf,
size_t nbytes,
size_t off) {
386 std::vector<stripe_chunk> stripelist = stripe_offsets(session, nbytes, off);
388 for(
int i=0; i<(int)stripelist.size(); i++) {
390 __sync_add_and_fetch(&thread_infos[chunk.mplex_thread]->pending_reads, 1);
391 mplex_readtasks[chunk.mplex_thread].push(
iotask(
this, READ, sessions[session]->readdescs[chunk.mplex_thread],
392 refptr, chunk.len, chunk.offset+off, chunk.offset));
399 int ret = pthread_create(&t, NULL, stream_read_loop, (
void*)task);
409 return sessions[session]->pinned_to_memory;
418 if (disable_preloading) {
422 size_t filesize = get_filesize(filename);
423 if (preloaded_bytes + filesize <= max_preload_bytes) {
424 preloaded_bytes += filesize;
425 m.set(
"preload_bytes", preloaded_bytes);
428 pfile->filename = filename;
429 pfile->length = filesize;
430 pfile->data = (uint8_t*) malloc(filesize);
431 pfile->touched =
false;
432 assert(pfile->data != NULL);
434 int fid = open(filename.c_str(), O_RDONLY);
436 logstream(
LOG_ERROR) <<
"Could not read file: " << filename
437 <<
" error: " << strerror(errno) << std::endl;
441 logstream(
LOG_INFO) <<
"Preloading: " << filename << std::endl;
442 preada(fid, pfile->data, filesize, 0);
444 preloaded_files.push_back(pfile);
446 preload_lock.unlock();
449 void commit_preloaded() {
450 for(std::vector<pinned_file *>::iterator it=preloaded_files.begin();
451 it != preloaded_files.end(); ++it) {
453 if (preloaded->touched) {
454 logstream(
LOG_INFO) <<
"Commit preloaded file: " << preloaded->filename << std::endl;
455 int fid = open(preloaded->filename.c_str(), O_WRONLY);
457 logstream(
LOG_ERROR) <<
"Could not read file: " << preloaded->filename
458 <<
" error: " << strerror(errno) << std::endl;
461 pwritea(fid, preloaded->data, preloaded->length, 0);
464 preloaded->touched =
false;
468 pinned_file * is_preloaded(std::string filename) {
470 pinned_file * preloaded = NULL;
471 for(std::vector<pinned_file *>::iterator it=preloaded_files.begin();
472 it != preloaded_files.end(); ++it) {
473 if (filename == (*it)->filename) {
478 preload_lock.unlock();
484 template <
typename T>
485 void pwritea_async(
int session, T * tbuf,
size_t nbytes,
size_t off,
bool free_after) {
486 std::vector<stripe_chunk> stripelist = stripe_offsets(session, nbytes, off);
487 refcountptr * refptr =
new refcountptr((
char*)tbuf, (
int) stripelist.size());
488 for(
int i=0; i<(int)stripelist.size(); i++) {
489 stripe_chunk chunk = stripelist[i];
490 __sync_add_and_fetch(&thread_infos[chunk.mplex_thread]->pending_writes, 1);
491 mplex_writetasks[chunk.mplex_thread].push(iotask(
this, WRITE, sessions[session]->writedescs[chunk.mplex_thread],
492 refptr, chunk.len, chunk.offset+off, chunk.offset, free_after));
496 template <
typename T>
497 void preada_now(
int session, T * tbuf,
size_t nbytes,
size_t off) {
498 metrics_entry me = m.start_time();
501 std::vector<stripe_chunk> stripelist = stripe_offsets(session, nbytes, off);
503 refcountptr * refptr =
new refcountptr((
char*)tbuf, (
int) stripelist.size());
505 for(
int i=0; i < (int)stripelist.size(); i++) {
506 stripe_chunk chunk = stripelist[i];
507 __sync_add_and_fetch(&thread_infos[chunk.mplex_thread]->pending_reads, 1);
510 mplex_priotasks[chunk.mplex_thread].push(iotask(
this, READ, sessions[session]->readdescs[chunk.mplex_thread],
511 refptr, chunk.len, chunk.offset+off, chunk.offset));
512 checklen += chunk.len;
514 assert(checklen == nbytes);
517 while(refptr->count>1) {
522 preada(sessions[session]->readdescs[threads.size()], tbuf, nbytes, off);
524 m.stop_time(me,
"preada_now",
false);
527 template <
typename T>
528 void pwritea_now(
int session, T * tbuf,
size_t nbytes,
size_t off) {
529 metrics_entry me = m.start_time();
530 std::vector<stripe_chunk> stripelist = stripe_offsets(session, nbytes, off);
533 for(
int i=0; i<(int)stripelist.size(); i++) {
534 stripe_chunk chunk = stripelist[i];
535 pwritea(sessions[session]->writedescs[chunk.mplex_thread], (
char*)tbuf+chunk.offset, chunk.len, chunk.offset+off);
536 checklen += chunk.len;
538 assert(checklen == nbytes);
539 m.stop_time(me,
"pwritea_now",
false);
549 template <
typename T>
552 pwritea_async(session, *tbuf, nbytes, off, free_after);
555 sessions[session]->pinned_to_memory->touched =
true;
559 template <
typename T>
560 void managed_preada_now(
int session, T ** tbuf,
size_t nbytes,
size_t off) {
562 preada_now(session, *tbuf, nbytes, off);
564 io_descriptor * iodesc = sessions[session];
565 *tbuf = (T*) (iodesc->pinned_to_memory->data + off);
569 template <
typename T>
570 void managed_pwritea_now(
int session, T ** tbuf,
size_t nbytes,
size_t off) {
572 pwritea_now(session, *tbuf, nbytes, off);
575 sessions[session]->pinned_to_memory->touched =
true;
580 void managed_malloc(
int session, T ** tbuf,
size_t nbytes,
size_t noff) {
582 *tbuf = (T*) malloc(nbytes);
584 io_descriptor * iodesc = sessions[session];
585 *tbuf = (T*) (iodesc->pinned_to_memory->data + noff);
589 template <
typename T>
590 void managed_preada_async(
int session, T ** tbuf,
size_t nbytes,
size_t off) {
592 preada_async(session, *tbuf, nbytes, off);
594 io_descriptor * iodesc = sessions[session];
595 *tbuf = (T*) (iodesc->pinned_to_memory->data + off);
599 template <
typename T>
600 void managed_release(
int session, T ** ptr) {
602 assert(*ptr != NULL);
609 void truncate(
int session,
size_t nbytes) {
611 assert(multiplex <= 1);
612 int stat = ftruncate(sessions[session]->writedescs[0], nbytes);
614 logstream(
LOG_ERROR) <<
"Could not truncate " << sessions[session]->filename <<
615 " error: " << strerror(errno) << std::endl;
620 void wait_for_reads() {
621 metrics_entry me = m.start_time();
623 int mplex = (int) thread_infos.size();
624 for(
int i=0; i<mplex; i++) {
625 while(thread_infos[i]->pending_reads > 0) {
630 m.stop_time(me,
"stripedio_wait_for_reads",
false);
633 void wait_for_writes() {
634 metrics_entry me = m.start_time();
635 int mplex = (int) thread_infos.size();
636 for(
int i=0; i<mplex; i++) {
637 while(thread_infos[i]->pending_writes>0) {
641 m.stop_time(me,
"stripedio_wait_for_writes",
false);
645 std::string multiplexprefix(
int stripe) {
648 sprintf(mstr,
"%d/", 1+stripe%multiplex);
649 return multiplex_root + std::string(mstr);
653 std::string multiplexprefix_random() {
654 return multiplexprefix((
int)random() % multiplex);
659 static void * io_thread_loop(
void * _info) {
661 thrinfo * info = (thrinfo*)_info;
662 logstream(
LOG_INFO) <<
"Thread for multiplex :" << info->mplex <<
" starting." << std::endl;
663 while(info->running) {
665 if (info->pending_reads>0) {
666 success = info->prioqueue->safepop(&task);
668 success = info->readqueue->safepop(&task);
671 success = info->commitqueue->safepop(&task);
674 if (task.action == WRITE) {
675 metrics_entry me = info->m->start_time();
677 pwritea(task.fd, task.ptr->ptr + task.ptroffset, task.length, task.offset);
678 if (task.free_after) {
680 if (__sync_sub_and_fetch(&task.ptr->count, 1) == 0) {
685 __sync_sub_and_fetch(&info->pending_writes, 1);
686 info->m->stop_time(me,
"commit_thr");
688 preada(task.fd, task.ptr->ptr+task.ptroffset, task.length, task.offset);
689 __sync_sub_and_fetch(&info->pending_reads, 1);
690 if (__sync_sub_and_fetch(&task.ptr->count, 1) == 0) {
702 static void * stream_read_loop(
void * _info) {
703 streaming_task * task = (streaming_task*)_info;
705 gettimeofday(&start, NULL);
706 size_t bufsize = 32*1024*1024;
714 if (task->iomgr->pinned_session(task->session)) {
715 __sync_add_and_fetch(&task->curpos, task->len);
719 while(task->curpos < task->len) {
720 size_t toread = std::min((
size_t)task->len - (
size_t)task->curpos, (
size_t)bufsize);
721 task->iomgr->preada_now(task->session, tbuf + task->curpos, toread, task->curpos);
722 __sync_add_and_fetch(&task->curpos, toread);
725 gettimeofday(&end, NULL);
731 static size_t get_filesize(std::string filename) {
732 std::string fname = filename;
733 int f = open(fname.c_str(), O_RDONLY);
736 logstream(
LOG_ERROR) <<
"Could not open file " << filename <<
" error: " << strerror(errno) << std::endl;
740 off_t sz = lseek(f, 0, SEEK_END);