/* placement copying ring queue (c) 2004 Ross Bencina disclaimer: this code is a proof of concept prototype. do NOT use it, it probably contains errors. */ #include #include #include #include #include #include class placeable_functor_base { public: // no virtual dtor, destroy is used instead // execute the command and return the size of the structure in // size. size is updated even if execute throws an exception // the use of size here avoids the overhead of an additional // virtual call to a size() method virtual void execute_and_destroy( int &size ) = 0; // destroy the object, return its size virtual int destroy() = 0; }; template class placeable_functor : public placeable_functor_base { T f_; public: explicit placeable_functor( const T& f ) : f_( f ) {} virtual void execute_and_destroy( int& size ) { size = sizeof(placeable_functor); //std::cout << "execute\n"; f_(); this->~placeable_functor(); } virtual int destroy() { this->~placeable_functor(); return sizeof(placeable_functor); } }; // this class currently does not protect against the read ptr colliding // with the write pointer. // when read_ == write_ the buffer is empty // if size was a power of 2 the buffer wrapping could perhaps be done // using a bitmask. class ring_buffer{ char *data_, *end_; volatile char *read_, *write_; public: ring_buffer( int size ) : data_( new char[size] ) , end_( data_ + size ) , read_( data_ ) , write_( data_ ) { //std::cout << "ring size: " << size << '\n'; } ~ring_buffer(){ delete [] data_; } bool empty() const { return read_ == write_; } void get_read_available( char* &mem1, int& size1, char* &mem2, int& size2 ){ if( write_ < read_ ){ mem1 = const_cast(read_); size1 = end_ - read_; mem2 = data_; size2 = write_ - data_; }else{ mem1 = const_cast(read_); size1 = write_ - read_; mem2 = 0; size2 = 0; } //std::cout << "get_read_available " << (void*)mem1 << ' ' << size1 << ' ' << (void*)mem2 << ' ' << size2 << '\n'; } void advance_read( int size ){ int a = end_ - read_; // read available before end if( size < a ) read_ += size; else read_ = data_ + (size - a); } void get_write_available( char* &mem1, int& size1, char* &mem2, int& size2 ){ if( read_ <= write_ ){ mem1 = const_cast(write_); size1 = end_ - write_; mem2 = data_; size2 = read_ - data_; }else{ mem1 = const_cast(write_); size1 = read_ - write_; mem2 = 0; size2 = 0; } //std::cout << "get_write_available " << (void*)mem1 << ' ' << size1 << ' ' << (void*)mem2 << ' ' << size2 << '\n'; } void advance_write( int size ){ int a = end_ - write_; // write available before end if( size < a ) write_ += size; else write_ = data_ + (size - a); } }; class dummy_end_functor_1 : public placeable_functor_base { public: virtual void execute_and_destroy( int& size ) { size = sizeof(dummy_end_functor_1); } virtual int destroy() { return sizeof(dummy_end_functor_1); } }; class dummy_end_functor_n : public placeable_functor_base { int size_; public: dummy_end_functor_n( int size ) : size_( size ) { assert( size_ >= sizeof(dummy_end_functor_n) ); } virtual void execute_and_destroy( int& size ) { size = size_; } virtual int destroy() { return size_; } }; class functor_ring_queue{ ring_buffer ring_; enum { ALIGNMENT = sizeof( dummy_end_functor_1 ) }; int round_to_alignment( int x ) { return (((x - 1) / ALIGNMENT) + 1 ) * ALIGNMENT; } char* prepare_write( int size, int& advanceOnCommit ) { char *mem1, *mem2; int size1, size2; ring_.get_write_available( mem1, size1, mem2, size2 ); // always leave a gap of ALIGNMENT bytes before end of buffer int available1 = (size2 != 0 ) ? size1 : size1 - ALIGNMENT; int required = round_to_alignment( size ); if( required <= available1 ){ advanceOnCommit = required; return mem1; }else if( required + ALIGNMENT <= size2 ){ // write dummy functor, it will be discarded // if the write is not commited if( size1 < sizeof( dummy_end_functor_n ) ) new(mem1) dummy_end_functor_1(); else new(mem1) dummy_end_functor_n( size1 ); advanceOnCommit = size1 + required; return mem1; }else{ // not enough contiguous space return 0; } } void commit_write( int advance ) { ring_.advance_write( advance ); } public: functor_ring_queue( int size ) : ring_( round_to_alignment(size) ) {} ~functor_ring_queue() { // destroy remaining functors without executing them char *mem1, *mem2; int size1, size2; ring_.get_read_available( mem1, size1, mem2, size2 ); char *end1 = mem1 + size1; while( mem1 != end1 ){ mem1 += round_to_alignment( reinterpret_cast(mem1)->destroy() ); } char *end2 = mem2 + size2; while( mem2 != end2 ){ mem2 += round_to_alignment( reinterpret_cast(mem2)->destroy() ); } } bool empty() const { return ring_.empty(); } template< class T > bool post( const T &f ) { int advanceOnCommit; if( char *p = prepare_write( sizeof(placeable_functor), advanceOnCommit ) ){ new (p) placeable_functor( f ); commit_write( advanceOnCommit ); return true; }else{ return false; } } bool post( void* (*f)(void) ) { int advanceOnCommit; if( char *p = prepare_write( sizeof(placeable_functor), advanceOnCommit ) ){ new (p) placeable_functor( f ); commit_write( advanceOnCommit ); return true; }else{ return false; } } bool execute_one() { char *mem1, *mem2; int size1, size2; ring_.get_read_available( mem1, size1, mem2, size2 ); if( size1 > 0 ){ placeable_functor_base *f = reinterpret_cast(mem1); int fsize; try{ f->execute_and_destroy( fsize ); }catch(...){ f->destroy(); ring_.advance_read( round_to_alignment(fsize) ); throw; } f->destroy(); ring_.advance_read( round_to_alignment(fsize) ); return true; }else{ return false; } } void execute_all() { char *mem1, *mem2; int size1, size2; ring_.get_read_available( mem1, size1, mem2, size2 ); while( size1 > 0 ){ int bytesRead = 0; char *end1 = mem1 + size1; while( mem1 != end1 ){ placeable_functor_base *f = reinterpret_cast(mem1); int fsize; try{ f->execute_and_destroy( fsize ); }catch(...){ f->destroy(); bytesRead += round_to_alignment(fsize); ring_.advance_read( bytesRead ); throw; } f->destroy(); bytesRead += round_to_alignment(fsize); mem1 += round_to_alignment(fsize); } char *end2 = mem2 + size2; while( mem2 != end2 ){ placeable_functor_base *f = reinterpret_cast(mem2); int fsize; try{ f->execute_and_destroy( fsize ); }catch(...){ f->destroy(); bytesRead += round_to_alignment(fsize); ring_.advance_read( bytesRead ); throw; } f->destroy(); bytesRead += round_to_alignment(fsize); mem2 += round_to_alignment(fsize); } ring_.advance_read( bytesRead ); // an alternate implementation would call //advance_read after executing each functor. ring_.get_read_available( mem1, size1, mem2, size2 ); } } }; // operator <= posts a functor. // it's a shorthand for mq.post( placeable_functor<...>( f ) ) template bool operator<=( functor_ring_queue& mq, const T& f ){ return mq.post( f ); } bool operator<=( functor_ring_queue& mq, void(*f)(void) ){ return mq.post( f ); } // support for forwarding messages via another buffer // eg mq2 <= mq1 <= &foo; // send foo to mq2 via mq1 class functor_queue_forwarding_proxy{ functor_ring_queue &lq_; functor_ring_queue &rq_; template< class T > class functor_queue_poster{ functor_ring_queue &q_; T f_; public: functor_queue_poster( functor_ring_queue& q, const T& f ) : q_( q ), f_( f ) {} void operator()(){ q_ <= f_; } }; public: functor_queue_forwarding_proxy( functor_ring_queue& lq, functor_ring_queue& rq ) : lq_( lq ), rq_( rq ) {} template bool forward( const T& f ) const{ rq_ <= functor_queue_poster( lq_, f ); } }; functor_queue_forwarding_proxy operator<=( functor_ring_queue& lq, functor_ring_queue& rq ){ return functor_queue_forwarding_proxy( lq, rq ); } template bool operator<=( const functor_queue_forwarding_proxy& fp, const T& f ){ return fp.forward( f ); } bool operator<=( const functor_queue_forwarding_proxy& fp, void(*f)(void) ){ return fp.forward( f ); } //-------------------------------------------------------------------------------------------------------- // support classes and functions for tests() class PrintChar { char c_; public: PrintChar( char c ) : c_( c ) {} ~PrintChar(){ //std::cout << "~PrintChar called\n"; } void operator()(){ std::cout << c_ << '\n'; } }; class PrintString { std::string text_; public: PrintString( const char *text ) : text_( text ) {} ~PrintString(){ //std::cout << "~PrintString called\n"; } void operator()(){ std::cout << text_ << '\n'; } }; void PrintHelloFunction() { std::cout << "hello function\n"; } struct TestClass{ TestClass() : j(0) {} TestClass( int j_ ) : j( j_ ) {} int j; void foo( int i ){ std::cout << "TestClass foo() : " << j << ", " << i << "\n"; } void bar(){ std::cout << "bar\n"; } }; //-------------------------------------------------------------------------------------------------------- void Test1() { std::cout << "---------- test 1 --------\n"; functor_ring_queue mq1( 512 ); // non-operator tests for( int i=0; i < 512; ++i ){ mq1.post( PrintString("hello" ) ); mq1.execute_one(); } std::cout << "---------- end test 1 --------\n"; } //-------------------------------------------------------------------------------------------------------- // test queueing messages and then executing them later... void Test2() { std::cout << "---------- test 2 --------\n"; functor_ring_queue mq1( 1024 ); functor_ring_queue mq2( 1024 ); std::cout << "---------- sending --------\n"; // free function mq1 <= &PrintHelloFunction; // nullary function objects mq1 <= PrintString( "hello world" ); mq1 <= PrintChar( 'c' ); TestClass c(99); // bound function call mq1 <= boost::bind( &TestClass::foo, boost::ref(c), 42 ); // messages that requeue messages in another queue mq2 <= mq1 <= &PrintHelloFunction; mq2 <= mq1 <= boost::bind( &TestClass::foo, boost::ref(c), 44); // usually the receive functions would be called from loops // in other threads (an isr callback for example) std::cout << "--------- receiving from mq1 --------\n"; while( mq1.execute_one() ) /* nothing */ ; std::cout << "--------- receiving from mq2 --------\n"; mq2.execute_all(); std::cout << "---------- end test 2 --------\n"; } int main( char *argv[], int argc ) { Test1(); Test2(); return 0; } // notes... /* // possible optimizations to VariablBlockSizeRingBuffer: o- if an upper bound on block size is known, and/or alignment isn't an issue a smaller (ie 1 2 or 3 byte) block size ptr could be used. o- alignment could be a specified parameter o- if some kind of handle idiom was used the buffer could be with the other variables (this is an advantage of JMC's template size parameter o- add counters for number of blocks queued and maximum observed fullness - a vtable entry in placeable_functor_base could be saved by removing the virtual dtor and having a virtual destroy function which returns the size of the object, and also making execute() call the dtor */ /* ring buffer sub types: - continuous (byte) buffer RingBuffer - fixed size (block) buffer FixedSizeBlockRingBuffer (perhaps not necessary, could be implemented with RingBuffer so long as size is a multiple of the block size. - variable block size buffer (requires sentinel for unused space) VariableSizeBlockRingBuffer */