2 #ifndef _Mq_MessageQueueSystem_H_
3 #define _Mq_MessageQueueSystem_H_
5 #include "OGRE/Threading/OgreLightweightMutex.h"
6 #include "OGRE/OgreCommon.h"
7 #include "OGRE/OgreFastArray.h"
18 static const size_t cSizeOfHeader;
20 typedef Ogre::FastArray<unsigned char> MessageArray;
21 typedef std::map<MessageQueueSystem*, MessageArray> PendingMessageMap;
23 Ogre::LightweightMutex mMessageQueueMutex;
25 PendingMessageMap mPendingOutgoingMessages;
26 MessageArray mIncomingMessages[2];
28 template <
typename T>
static void storeMessageToQueue( MessageArray &queue,
32 const size_t startOffset = queue.size();
35 const size_t totalSize = Ogre::alignToNextMultiple( cSizeOfHeader +
sizeof(T),
37 queue.resize( queue.size() + totalSize );
40 *
reinterpret_cast<Ogre::uint32*
>( queue.begin() + startOffset ) = totalSize;
41 *
reinterpret_cast<Ogre::uint32*
>( queue.begin() + startOffset +
42 sizeof(Ogre::uint32) ) = messageId;
45 T *dstPtr =
reinterpret_cast<T*
>( queue.begin() + startOffset + cSizeOfHeader );
46 memcpy( dstPtr, &msg,
sizeof( T ) );
68 storeMessageToQueue( mPendingOutgoingMessages[dstSystem], messageId, msg );
75 PendingMessageMap::iterator itMap = mPendingOutgoingMessages.begin();
76 PendingMessageMap::iterator enMap = mPendingOutgoingMessages.end();
78 while( itMap != enMap )
82 dstSystem->mMessageQueueMutex.lock();
84 dstSystem->mIncomingMessages[0].appendPOD(
85 itMap->second.begin(),
86 itMap->second.end() );
88 dstSystem->mMessageQueueMutex.unlock();
90 itMap->second.clear();
103 template <
typename T>
106 mMessageQueueMutex.lock();
107 storeMessageToQueue( mIncomingMessages[0], messageId, msg );
108 mMessageQueueMutex.unlock();
116 mMessageQueueMutex.lock();
117 mIncomingMessages[0].swap( mIncomingMessages[1] );
118 mMessageQueueMutex.unlock();
120 MessageArray::const_iterator itor = mIncomingMessages[1].begin();
121 MessageArray::const_iterator end = mIncomingMessages[1].end();
125 Ogre::uint32 totalSize = *
reinterpret_cast<const Ogre::uint32*
>( itor );
126 Ogre::uint32 messageId = *
reinterpret_cast<const Ogre::uint32*
>( itor +
127 sizeof(Ogre::uint32) );
129 assert( itor + totalSize <= end &&
"MessageQueue corrupted!" );
131 "MessageQueue corrupted or invalid message!" );
133 const void *data = itor + cSizeOfHeader;
138 mIncomingMessages[1].clear();