Elaztek Developer Hub
Blamite Game Engine - blam!  00398.09.22.23.2015.blamite
The core library for the Blamite Game Engine.
MessageQueueSystem.h
Go to the documentation of this file.
1 
2 #ifndef _Mq_MessageQueueSystem_H_
3 #define _Mq_MessageQueueSystem_H_
4 
5 #include "OGRE/Threading/OgreLightweightMutex.h"
6 #include "OGRE/OgreCommon.h"
7 #include "OGRE/OgreFastArray.h"
8 #include "MqMessages.h"
9 
10 #include <map>
11 
12 namespace Demo
13 {
14 namespace Mq
15 {
17  {
18  static const size_t cSizeOfHeader;
19 
20  typedef Ogre::FastArray<unsigned char> MessageArray;
21  typedef std::map<MessageQueueSystem*, MessageArray> PendingMessageMap;
22 
23  Ogre::LightweightMutex mMessageQueueMutex;
24 
25  PendingMessageMap mPendingOutgoingMessages;
26  MessageArray mIncomingMessages[2];
27 
28  template <typename T> static void storeMessageToQueue( MessageArray &queue,
29  Mq::MessageId messageId, const T &msg )
30  {
31  //Save the current offset.
32  const size_t startOffset = queue.size();
33 
34  //Enlarge the queue. Preserve alignment.
35  const size_t totalSize = Ogre::alignToNextMultiple( cSizeOfHeader + sizeof(T),
36  sizeof(size_t) );
37  queue.resize( queue.size() + totalSize );
38 
39  //Write the header: the Size and the MessageId
40  *reinterpret_cast<Ogre::uint32*>( queue.begin() + startOffset ) = totalSize;
41  *reinterpret_cast<Ogre::uint32*>( queue.begin() + startOffset +
42  sizeof(Ogre::uint32) ) = messageId;
43 
44  //Write the actual message.
45  T *dstPtr = reinterpret_cast<T*>( queue.begin() + startOffset + cSizeOfHeader );
46  memcpy( dstPtr, &msg, sizeof( T ) );
47  }
48 
49  public:
51  {
52  }
53 
65  template <typename T>
66  void queueSendMessage( MessageQueueSystem *dstSystem, Mq::MessageId messageId, const T &msg )
67  {
68  storeMessageToQueue( mPendingOutgoingMessages[dstSystem], messageId, msg );
69  }
70 
74  {
75  PendingMessageMap::iterator itMap = mPendingOutgoingMessages.begin();
76  PendingMessageMap::iterator enMap = mPendingOutgoingMessages.end();
77 
78  while( itMap != enMap )
79  {
80  MessageQueueSystem *dstSystem = itMap->first;
81 
82  dstSystem->mMessageQueueMutex.lock();
83 
84  dstSystem->mIncomingMessages[0].appendPOD(
85  itMap->second.begin(),
86  itMap->second.end() );
87 
88  dstSystem->mMessageQueueMutex.unlock();
89 
90  itMap->second.clear();
91 
92  ++itMap;
93  }
94 
95  //mPendingOutgoingMessages.clear();
96  }
97 
103  template <typename T>
104  void receiveMessageImmediately( Mq::MessageId messageId, const T &msg )
105  {
106  mMessageQueueMutex.lock();
107  storeMessageToQueue( mIncomingMessages[0], messageId, msg );
108  mMessageQueueMutex.unlock();
109  }
110 
111  protected:
115  {
116  mMessageQueueMutex.lock();
117  mIncomingMessages[0].swap( mIncomingMessages[1] );
118  mMessageQueueMutex.unlock();
119 
120  MessageArray::const_iterator itor = mIncomingMessages[1].begin();
121  MessageArray::const_iterator end = mIncomingMessages[1].end();
122 
123  while( itor != end )
124  {
125  Ogre::uint32 totalSize = *reinterpret_cast<const Ogre::uint32*>( itor );
126  Ogre::uint32 messageId = *reinterpret_cast<const Ogre::uint32*>( itor +
127  sizeof(Ogre::uint32) );
128 
129  assert( itor + totalSize <= end && "MessageQueue corrupted!" );
130  assert( messageId <= Mq::NUM_MESSAGE_IDS &&
131  "MessageQueue corrupted or invalid message!" );
132 
133  const void *data = itor + cSizeOfHeader;
134  processIncomingMessage( static_cast<Mq::MessageId>( messageId ), data );
135  itor += totalSize;
136  }
137 
138  mIncomingMessages[1].clear();
139  }
140 
142  virtual void processIncomingMessage( Mq::MessageId messageId, const void *data ) = 0;
143  };
144 }
145 }
146 
147 #endif
Demo::Mq::MessageQueueSystem::~MessageQueueSystem
virtual ~MessageQueueSystem()
Definition: MessageQueueSystem.h:50
Demo::Mq::MessageQueueSystem::queueSendMessage
void queueSendMessage(MessageQueueSystem *dstSystem, Mq::MessageId messageId, const T &msg)
Queues message 'msg' to be sent to a destination MessageQueueSystem.
Definition: MessageQueueSystem.h:66
Demo::Mq::MessageQueueSystem::processIncomingMessages
void processIncomingMessages(void)
Processes all incoming messages received from other threads.
Definition: MessageQueueSystem.h:114
MqMessages.h
Demo
Definition: BaseSystem.cpp:5
Demo::Mq::NUM_MESSAGE_IDS
@ NUM_MESSAGE_IDS
Definition: MqMessages.h:23
Demo::Mq::MessageQueueSystem::flushQueuedMessages
void flushQueuedMessages(void)
Sends all the messages queued via see queueSendMessage(); Must be called from the thread that owns 't...
Definition: MessageQueueSystem.h:73
Demo::Mq::MessageId
MessageId
Definition: MqMessages.h:12
Demo::Mq::MessageQueueSystem
Definition: MessageQueueSystem.h:16
Demo::Mq::MessageQueueSystem::processIncomingMessage
virtual void processIncomingMessage(Mq::MessageId messageId, const void *data)=0
Derived classes must implement this function to process the incoming message.
MessageQueueSystem.h
Demo::Mq::MessageQueueSystem::receiveMessageImmediately
void receiveMessageImmediately(Mq::MessageId messageId, const T &msg)
Sends a message to 'this' base system immediately.
Definition: MessageQueueSystem.h:104