Divide Framework 0.1
A free and open-source 3D Framework under heavy development
Loading...
Searching...
No Matches
tcp_session_tpl.cpp
Go to the documentation of this file.
1
2
5
7
10
11#include <boost/asio/read.hpp>
12#include <boost/asio/write.hpp>
13
15// TCP //
17
18namespace Divide
19{
20
21 tcp_session_tpl::tcp_session_tpl( boost::asio::io_context& io_context, channel& ch )
22 : _header( 0 )
23 , _channel( ch )
24 , _socket( io_context )
25 , _inputDeadline( io_context.get_executor() )
26 , _nonEmptyOutputQueue( io_context.get_executor() )
27 , _outputDeadline( io_context.get_executor() )
28 , _startTime( time( nullptr ) )
29 , _strand( std::make_unique<boost::asio::io_context::strand>( io_context ) )
30 {
31 _inputDeadline.expires_at( boost::posix_time::pos_infin );
32 _outputDeadline.expires_at( boost::posix_time::pos_infin );
33 _nonEmptyOutputQueue.expires_at( boost::posix_time::pos_infin );
34 }
35
37 {
38 _channel.join( shared_from_this() );
39
40 start_read();
41
42 _inputDeadline.async_wait( _strand->wrap( [&]( const auto )
43 {
44 check_deadline( &_inputDeadline );
45 } ) );
46
48
49 _outputDeadline.async_wait( _strand->wrap( [&]( const auto )
50 {
51 check_deadline( &_outputDeadline );
52 } ) );
53 }
54
56 {
57 _channel.leave( shared_from_this() );
58
59 _socket.close();
60 _inputDeadline.cancel();
61 _nonEmptyOutputQueue.cancel();
62 _outputDeadline.cancel();
63 }
64
66 {
67 return !_socket.is_open();
68 }
69
71 {
72 _outputQueue.push_back( p );
73 _nonEmptyOutputQueue.expires_at( boost::posix_time::neg_infin );
74 }
75
76 void tcp_session_tpl::sendFile( const string& fileName )
77 {
78 _outputFileQueue.push_back( fileName );
79 }
80
82 {
83 _header = 0;
84 _inputBuffer.consume( _inputBuffer.size() );
85 _inputDeadline.expires_from_now( boost::posix_time::seconds( 30 ) );
86 async_read(
87 _socket,
88 boost::asio::buffer( &_header, sizeof _header ),
89 _strand->wrap(
90 [&]( const boost::system::error_code ec, const std::size_t N )
91 {
92 handle_read_body( ec, N );
93 } ) );
94 }
95
96 void tcp_session_tpl::handle_read_body( const boost::system::error_code& ec, [[maybe_unused]] size_t bytes_transferred )
97 {
98 if ( stopped() )
99 {
100 return;
101 }
102
103 if ( !ec )
104 {
105 _inputDeadline.expires_from_now( boost::posix_time::seconds( 30 ) );
106 async_read(
107 _socket, _inputBuffer.prepare( _header ),
108 _strand->wrap(
109 [&]( const boost::system::error_code code, const std::size_t N )
110 {
111 handle_read_packet( code, N );
112 } ) );
113 }
114 else
115 {
116 stop();
117 }
118 }
119
120 void tcp_session_tpl::handle_read_packet( const boost::system::error_code& ec, [[maybe_unused]] const size_t bytes_transferred )
121 {
122
123 if ( stopped() )
124 {
125 return;
126 }
127
128 if ( !ec )
129 {
130 _inputBuffer.commit( _header );
131 ASIO::LOG_PRINT( Util::StringFormat(LOCALE_STR("ASIO_BUFFER_SIZE"), _header ).c_str() );
132 WorldPacket packet;
133
134 if (!packet.loadFromBuffer(_inputBuffer))
135 {
136 ASIO::LOG_PRINT( Util::StringFormat( LOCALE_STR( "ASIO_EXCEPTION" ), "WorldPacket::loadFromBuffer" ).c_str(), true );
137 }
138
139 handlePacket( packet );
140 start_read();
141 }
142 else
143 {
144 stop();
145 }
146 }
147
149 {
150 if ( _outputQueue.empty() )
151 {
152 await_output();
153 }
154
155 // Set a deadline for the write operation.
156 _outputDeadline.expires_from_now( boost::posix_time::seconds( 30 ) );
157
158 boost::asio::streambuf buf;
159
160 WorldPacket& p = _outputQueue.front();
161 if (!p.saveToBuffer(buf))
162 {
163 ASIO::LOG_PRINT( Util::StringFormat( LOCALE_STR( "ASIO_EXCEPTION" ), "WorldPacket::saveToBuffer" ).c_str(), true );
164 }
165
166 size_t header = buf.size();
168 buffers.push_back( boost::asio::buffer( &header, sizeof header ) );
169 buffers.push_back( buf.data() );
170 // Start an asynchronous operation to send a message.
171 if ( p.opcode() == OPCodes::SMSG_SEND_FILE )
172 {
173 async_write(
174 _socket, buffers,
175 _strand->wrap(
176 [&]( const boost::system::error_code code, const size_t )
177 {
178 handle_write_file( code );
179 } ) );
180 }
181 else
182 {
183 async_write(
184 _socket, buffers,
185 _strand->wrap(
186 [&]( const boost::system::error_code code, const size_t )
187 {
188 handle_write( code );
189 } ) );
190 }
191 }
192 void tcp_session_tpl::handle_write_file( [[maybe_unused]] const boost::system::error_code& ec )
193 {
194
195 boost::asio::streambuf request_;
196 const string filePath = _outputFileQueue.front();
197 std::ifstream source_file;
198 source_file.open( filePath.c_str(),
199 std::ios_base::binary | std::ios_base::ate );
200 if ( !source_file )
201 {
202 ASIO::LOG_PRINT( Util::StringFormat(LOCALE_STR("ASIO_FAIL_OPEN_FILE"), filePath.c_str()).c_str(), true );
203 return;
204 }
205 const size_t file_size = source_file.tellg();
206 source_file.seekg( 0 );
207 // first send file name and file size to server
208 std::ostream request_stream( &request_ );
209 request_stream << filePath << "\n" << file_size << "\n\n";
210 ASIO::LOG_PRINT( Util::StringFormat(LOCALE_STR("ASIO_WRITE_FILE_REQUEST_SIZE"), request_.size()).c_str() );
211
212 // Start an asynchronous resolve to translate the server and service names
213 // into a list of endpoints.
214 _outputFileQueue.pop_front();
215 async_write( _socket,
216 request_,
217 _strand->wrap( [&]( const boost::system::error_code code, const size_t )
218 {
219 handle_write( code );
220 } ) );
221 }
222
223 void tcp_session_tpl::handle_write( const boost::system::error_code& ec )
224 {
225 if ( stopped() ) return;
226
227 if ( !ec )
228 {
229 _outputQueue.pop_front();
230 await_output();
231 }
232 else
233 {
234 stop();
235 }
236 }
237
239 {
240 if ( stopped() ) return;
241
242 if ( _outputQueue.empty() )
243 {
244 if ( _outputQueue.empty() )
245 {
246 _nonEmptyOutputQueue.expires_at( boost::posix_time::pos_infin );
247 _nonEmptyOutputQueue.async_wait( [&]( const boost::system::error_code )
248 {
249 await_output();
250 } );
251 }
252 }
253 else
254 {
255 start_write();
256 }
257 }
258
260 {
261 if ( stopped() ) return;
262
263 // Check whether the deadline has passed. We compare the deadline against
264 // the current time since a new asynchronous operation may have moved the
265 // deadline before this actor had a chance to run.
266 if ( deadline->expires_at() <= boost::asio::deadline_timer::traits_type::now() )
267 {
268 // The deadline has passed. Stop the session. The other actors will
269 // terminate as soon as possible.
270 stop();
271 }
272 else
273 {
274 // Put the actor back to sleep.
275 deadline->async_wait( [&]( const boost::system::error_code )
276 {
277 check_deadline( deadline );
278 } );
279 }
280 }
281
283 // UDP //
285
286 udp_broadcaster::udp_broadcaster( boost::asio::io_context& io_context,
287 const boost::asio::ip::udp::endpoint& broadcast_endpoint )
288 : socket_( io_context )
289 {
290 socket_.connect( broadcast_endpoint );
291 }
292
294 {
295 boost::asio::streambuf buf;
296 if (!p.saveToBuffer(buf))
297 {
298 ASIO::LOG_PRINT( Util::StringFormat( LOCALE_STR( "ASIO_EXCEPTION" ), "WorldPacket::saveToBuffer" ).c_str(), true );
299 }
300
301 size_t header = buf.size();
303 buffers.push_back( boost::asio::buffer( &header, sizeof header ) );
304 buffers.push_back( buf.data() );
305 boost::system::error_code ignored_ec;
306 socket_.send( buffers, 0, ignored_ec );
307 }
308
310 {
311 switch ( p.opcode() )
312 {
314 ASIO::LOG_PRINT( LOCALE_STR("ASIO_RECEIVED_HEARTBEAT") );
316 break;
318 ASIO::LOG_PRINT( LOCALE_STR("ASIO_RECEIVED_PING") );
319 HandlePingOpCode( p );
320 break;
323 break;
325 ASIO::LOG_PRINT( LOCALE_STR( "ASIO_RECEIVED_ENTITY_UPDATE" ) );
327 break;
328 default:
329 DIVIDE_UNEXPECTED_CALL_MSG( "Unknown network message!" );
330 break;
331 }
332 }
333
335 {
337 ASIO::LOG_PRINT( LOCALE_STR("ASIO_SEND_HEARBEAT") );
338 r << (I8)0;
339 sendPacket( r );
340 }
341
343 {
344 F32 time = 0;
345 p >> time;
346 ASIO::LOG_PRINT( Util::StringFormat(LOCALE_STR("ASIO_SEND_PONG"), time ).c_str() );
348 r << time;
349 sendPacket( r );
350 }
351
353 {
354 string client;
355 p >> client;
356 ASIO::LOG_PRINT( Util::StringFormat(LOCALE_STR("ASIO_RECEIVED_REQUEST_DISCONNECT"), client.c_str()).c_str() );
358 r << (U8)0; // this will be the error code returned after safely saving
359 // client
360 sendPacket( r );
361 }
362
364 {
365 UpdateEntities( p );
366 }
367
368}; // namespace Divide
#define LOCALE_STR(X)
Definition: Localization.h:91
#define DIVIDE_UNEXPECTED_CALL_MSG(X)
static void LOG_PRINT(const char *msg, bool error=false)
Definition: ASIO.cpp:123
static const ValueType SMSG_SEND_FILE
Definition: OPCodesTpl.h:19
static const ValueType CMSG_PING
Definition: OPCodesTpl.h:23
static const ValueType CMSG_ENTITY_UPDATE
Definition: OPCodesTpl.h:22
static const ValueType CMSG_REQUEST_DISCONNECT
Definition: OPCodesTpl.h:21
static const ValueType SMSG_PONG
Definition: OPCodesTpl.h:24
static const ValueType SMSG_DISCONNECT
Definition: OPCodesTpl.h:20
static const ValueType MSG_HEARTBEAT
Definition: OPCodesTpl.h:18
bool saveToBuffer(boost::asio::streambuf &buf) const
Definition: WorldPacket.cpp:50
bool loadFromBuffer(boost::asio::streambuf &buf)
Definition: WorldPacket.cpp:27
void leave(const subscriber_ptr &sub)
void join(const subscriber_ptr &sub)
eastl::deque< WorldPacket > _outputQueue
deadline_timer _inputDeadline
eastl::deque< string > _outputFileQueue
virtual void HandlePingOpCode(WorldPacket &p)
virtual void HandleEntityUpdateOpCode(WorldPacket &p)
boost::asio::streambuf _inputBuffer
virtual void sendFile(const string &fileName)
virtual void handlePacket(WorldPacket &p)
virtual void HandleDisconnectOpCode(WorldPacket &p)
virtual void handle_write_file(const boost::system::error_code &ec)
virtual void handle_read_body(const boost::system::error_code &ec, size_t bytes_transferred)
std::unique_ptr< boost::asio::io_context::strand > _strand
deadline_timer _outputDeadline
tcp_session_tpl(boost::asio::io_context &io_context, channel &ch)
deadline_timer _nonEmptyOutputQueue
virtual void handle_read_packet(const boost::system::error_code &ec, size_t bytes_transferred)
virtual void HandleHeartBeatOpCode(WorldPacket &p)
void sendPacket(const WorldPacket &p) override
virtual bool stopped() const
virtual void handle_write(const boost::system::error_code &ec)
virtual void check_deadline(deadline_timer *deadline)
void sendPacket(const WorldPacket &p) override
udp_broadcaster(boost::asio::io_context &io_context, const boost::asio::ip::udp::endpoint &broadcast_endpoint)
Str StringFormat(const char *fmt, Args &&...args)
Handle console commands that start with a forward slash.
Definition: AIProcessor.cpp:7
void UpdateEntities(WorldPacket &p)
uint8_t U8
eastl::vector< Type > vector
Definition: Vector.h:42
boost::asio::basic_deadline_timer< boost::posix_time::ptime, boost::asio::time_traits< boost::posix_time::ptime >, boost::asio::io_context::executor_type > deadline_timer
Definition: Utils.h:43