X Tutup
/* This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ #include #include #include #include #include #include #include #include #include #include #include namespace { struct UdpSocketMaker : public sfn::UdpSocket {}; } namespace sfn { class UdpSocket::UdpSocketImpl { public: UdpSocketImpl( UdpSocket* owner ) : udp_socket{ owner }, asio_socket{ *static_cast( owner->GetIOService() ) } { } void SendHandler( const asio::error_code& error, std::size_t bytes_sent, asio::ip::udp::endpoint endpoint, std::shared_ptr> buffer ) { { auto lock = udp_socket->AcquireLock(); if( ( error == asio::error::operation_aborted ) || ( error == asio::error::connection_aborted ) || ( error == asio::error::connection_reset ) ) { return; } else if( error ) { ErrorMessage() << "Async Send Error: " << error.message() << "\n"; return; } if( bytes_sent == buffer->size() ) { return; } else { auto new_buffer = std::make_shared>( buffer->size() - bytes_sent ); std::copy_n( buffer->begin() + static_cast( bytes_sent ), new_buffer->size(), new_buffer->begin() ); asio_socket.async_send_to( asio::buffer( *new_buffer ), endpoint, static_cast( udp_socket->GetStrand() )->wrap( std::bind( []( std::weak_ptr socket, const asio::error_code& handler_error, std::size_t handler_bytes_sent, asio::ip::udp::endpoint handler_endpoint, std::shared_ptr> handler_buffer ) { auto shared_socket = socket.lock(); if( !shared_socket ) { return; } auto handler_lock = shared_socket->AcquireLock(); shared_socket->m_impl->SendHandler( handler_error, handler_bytes_sent, handler_endpoint, handler_buffer ); }, std::weak_ptr( udp_socket->shared_from_this() ), std::placeholders::_1, std::placeholders::_2, endpoint, new_buffer ) ) ); } } if( bytes_sent ) { udp_socket->OnSent(); } } void ReceiveHandler( const asio::error_code& error, std::size_t bytes_received, std::shared_ptr endpoint_ptr ) { { auto lock = udp_socket->AcquireLock(); if( receiving ) { return; } if( ( error == asio::error::operation_aborted ) || ( error == asio::error::connection_aborted ) || ( error == asio::error::connection_reset ) ) { return; } else if( error ) { ErrorMessage() << "Async Receive Error: " << error.message() << "\n"; return; } if( bytes_received ) { auto endpoint = *endpoint_ptr; receive_buffer[endpoint].insert( receive_buffer[endpoint].end(), receive_memory.begin(), receive_memory.begin() + bytes_received ); pending_data += bytes_received; } if( pending_data < GetMaximumBlockSize() ) { std::shared_ptr receive_endpoint_ptr = std::make_shared(); receiving = true; asio_socket.async_receive_from( asio::buffer( receive_memory ), *receive_endpoint_ptr, static_cast( udp_socket->GetStrand() )->wrap( std::bind( []( std::weak_ptr socket, const asio::error_code& handler_error, std::size_t handler_bytes_received, std::shared_ptr handler_endpoint_ptr ) { auto shared_socket = socket.lock(); if( !shared_socket ) { return; } auto handler_lock = shared_socket->AcquireLock(); shared_socket->m_impl->receiving = false; shared_socket->m_impl->ReceiveHandler( handler_error, handler_bytes_received, handler_endpoint_ptr ); }, std::weak_ptr( udp_socket->shared_from_this() ), std::placeholders::_1, std::placeholders::_2, receive_endpoint_ptr ) ) ); } } if( bytes_received ) { udp_socket->OnReceived(); } } UdpSocket* udp_socket; asio::ip::udp::socket asio_socket; std::map> receive_buffer; std::array receive_memory; std::size_t pending_data = 0; bool receiving = false; }; UdpSocket::UdpSocket() : m_impl{ make_unique( this ) } { } UdpSocket::~UdpSocket() { Close(); } UdpSocket::Ptr UdpSocket::Create() { return std::make_shared(); } void UdpSocket::Bind( const Endpoint& endpoint ) { auto lock = AcquireLock(); auto asio_endpoint = asio::ip::basic_endpoint{ endpoint.GetAddress().m_impl->address, endpoint.GetPort() }; if( !m_impl->asio_socket.is_open() ) { m_impl->asio_socket.open( asio_endpoint.protocol() ); } asio::error_code error; m_impl->asio_socket.bind( asio_endpoint, error ); if( error ) { ErrorMessage() << "Bind() Error: " << error.message() << "\n"; return; } m_impl->ReceiveHandler( asio::error_code{}, 0, nullptr ); } void UdpSocket::Close() { auto lock = AcquireLock(); if( !m_impl->asio_socket.is_open() ) { return; } m_impl->asio_socket.close(); } void UdpSocket::SendTo( const void* data, std::size_t size, const Endpoint& endpoint ) { auto lock = AcquireLock(); if( !data || !size ) { return; } auto asio_endpoint = asio::ip::basic_endpoint{ endpoint.GetAddress().m_impl->address, endpoint.GetPort() }; if( !m_impl->asio_socket.is_open() ) { m_impl->asio_socket.open( asio_endpoint.protocol() ); } auto buffer = std::make_shared>( size ); std::memcpy( &( (*buffer)[0] ), data, size ); m_impl->SendHandler( asio::error_code{}, 0, asio_endpoint, buffer ); } std::size_t UdpSocket::ReceiveFrom( void* data, std::size_t size, const Endpoint& endpoint ) { auto lock = AcquireLock(); if( !data || !size ) { return 0; } auto asio_endpoint = asio::ip::basic_endpoint{ endpoint.GetAddress().m_impl->address, endpoint.GetPort() }; if( !m_impl->asio_socket.is_open() ) { m_impl->asio_socket.open( asio_endpoint.protocol() ); } auto receive_size = std::min( size, m_impl->receive_buffer[asio_endpoint].size() ); for( std::size_t index = 0; index < receive_size; index++ ) { static_cast( data )[index] = m_impl->receive_buffer[asio_endpoint][index]; } m_impl->receive_buffer[asio_endpoint].erase( m_impl->receive_buffer[asio_endpoint].begin(), m_impl->receive_buffer[asio_endpoint].begin() + static_cast( receive_size ) ); if( m_impl->receive_buffer[asio_endpoint].empty() ) { m_impl->receive_buffer.erase( asio_endpoint ); } auto start = false; if( ( m_impl->pending_data >= GetMaximumBlockSize() ) && ( m_impl->pending_data - receive_size < GetMaximumBlockSize() ) ) { start = true; } m_impl->pending_data -= receive_size; if( start ) { m_impl->ReceiveHandler( asio::error_code{}, 0, nullptr ); } return receive_size; } Endpoint UdpSocket::GetLocalEndpoint() const { auto lock = AcquireLock(); IpAddress address; address.m_impl->address = m_impl->asio_socket.local_endpoint().address(); return { address, m_impl->asio_socket.local_endpoint().port() }; } void UdpSocket::ClearBuffers() { auto lock = AcquireLock(); m_impl->receive_buffer.clear(); } std::size_t UdpSocket::BytesToReceive( const Endpoint& endpoint ) const { auto lock = AcquireLock(); auto asio_endpoint = asio::ip::basic_endpoint{ endpoint.GetAddress().m_impl->address, endpoint.GetPort() }; auto iterator = m_impl->receive_buffer.find( asio_endpoint ); if( iterator == m_impl->receive_buffer.end() ) { return 0; } return iterator->second.size(); } std::deque UdpSocket::PendingEndpoints() const { auto lock = AcquireLock(); std::deque endpoints; for( auto buffer : m_impl->receive_buffer ) { IpAddress address; address.m_impl->address = buffer.first.address(); endpoints.emplace_back( address, buffer.first.port() ); } return endpoints; } /// @cond void UdpSocket::SetInternalSocket( void* internal_socket ) { m_impl->asio_socket = std::move( *static_cast( internal_socket ) ); } /// @endcond }
X Tutup