/*
Mosh: the mobile shell
Copyright 2012 Keith Winstein
This program is free software: you can redistribute it and/or modify
it under the terms of the GNU General Public License as published by
the Free Software Foundation, either version 3 of the License, or
(at your option) any later version.
This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see .
*/
#include
#include
#include
#include
#include
#include "transportsender.h"
#include "transportfragment.h"
using namespace Network;
using namespace std;
template
TransportSender::TransportSender( Connection *s_connection, MyState &initial_state )
: connection( s_connection ),
current_state( initial_state ),
sent_states( 1, TimestampedState( timestamp(), 0, initial_state ) ),
assumed_receiver_state( sent_states.begin() ),
fragmenter(),
next_ack_time( timestamp() ),
next_send_time( timestamp() ),
verbose( false ),
shutdown_in_progress( false ),
shutdown_tries( 0 ),
ack_num( 0 ),
pending_data_ack( false ),
SEND_MINDELAY( 15 ),
last_heard( 0 ),
prng()
{
}
/* Try to send roughly two frames per RTT, bounded by limits on frame rate */
template
unsigned int TransportSender::send_interval( void ) const
{
int SEND_INTERVAL = lrint( ceil( connection->get_SRTT() / 2.0 ) );
if ( SEND_INTERVAL < SEND_INTERVAL_MIN ) {
SEND_INTERVAL = SEND_INTERVAL_MIN;
} else if ( SEND_INTERVAL > SEND_INTERVAL_MAX ) {
SEND_INTERVAL = SEND_INTERVAL_MAX;
}
return SEND_INTERVAL;
}
/* Housekeeping routine to calculate next send and ack times */
template
void TransportSender::calculate_timers( void )
{
uint64_t now = timestamp();
/* Update assumed receiver state */
update_assumed_receiver_state();
/* Cut out common prefix of all states */
rationalize_states();
if ( pending_data_ack && (next_ack_time > now + ACK_DELAY) ) {
next_ack_time = now + ACK_DELAY;
}
if ( ( !(current_state == assumed_receiver_state->state)
&& (last_heard + ACTIVE_RETRY_TIMEOUT > now) )
|| !(current_state == sent_states.back().state) ) { /* pending data to send */
if ( next_send_time > now + SEND_MINDELAY ) {
next_send_time = now + SEND_MINDELAY;
}
if ( next_send_time < sent_states.back().timestamp + send_interval() ) {
next_send_time = sent_states.back().timestamp + send_interval();
}
} else {
next_send_time = uint64_t(-1);
}
/* speed up shutdown sequence */
if ( shutdown_in_progress || (ack_num == uint64_t(-1)) ) {
next_ack_time = sent_states.back().timestamp + send_interval();
}
}
/* How many ms to wait until next event */
template
int TransportSender::wait_time( void )
{
calculate_timers();
uint64_t next_wakeup = next_ack_time;
if ( next_send_time < next_wakeup ) {
next_wakeup = next_send_time;
}
uint64_t now = timestamp();
if ( !connection->get_has_remote_addr() ) {
return INT_MAX;
}
if ( next_wakeup > now ) {
return next_wakeup - now;
} else {
return 0;
}
}
/* Send data or an empty ack if necessary */
template
void TransportSender::tick( void )
{
calculate_timers(); /* updates assumed receiver state and rationalizes */
if ( !connection->get_has_remote_addr() ) {
return;
}
uint64_t now = timestamp();
if ( (now < next_ack_time)
&& (now < next_send_time) ) {
return;
}
/* Determine if a new diff or empty ack needs to be sent */
string diff = current_state.diff_from( assumed_receiver_state->state );
/* verify diff has round-trip identity (modulo Unicode fallback rendering) */
/*
MyState newstate( assumed_receiver_state->state );
newstate.apply_string( diff );
if ( current_state.compare( newstate ) ) {
fprintf( stderr, "Diff: %s\n", diff.c_str() );
}
*/
if ( diff.empty() && (now >= next_ack_time) ) {
send_empty_ack();
return;
}
if ( !diff.empty() && ( (now >= next_send_time)
|| (now >= next_ack_time) ) ) {
/* Send diffs or ack */
send_to_receiver( diff );
return;
}
}
template
void TransportSender::send_empty_ack( void )
{
uint64_t now = timestamp();
assert( now >= next_ack_time );
uint64_t new_num = sent_states.back().num + 1;
/* special case for shutdown sequence */
if ( shutdown_in_progress ) {
new_num = uint64_t( -1 );
}
// sent_states.push_back( TimestampedState( sent_states.back().timestamp, new_num, current_state ) );
add_sent_state( now, new_num, current_state );
send_in_fragments( "", new_num );
next_ack_time = now + ACK_INTERVAL;
next_send_time = uint64_t(-1);
}
template
void TransportSender::add_sent_state( uint64_t the_timestamp, uint64_t num, MyState &state )
{
sent_states.push_back( TimestampedState( the_timestamp, num, state ) );
if ( sent_states.size() > 32 ) { /* limit on state queue */
typename sent_states_type::iterator last = sent_states.end();
for ( int i = 0; i < 16; i++ ) { last--; }
sent_states.erase( last ); /* erase state from middle of queue */
}
}
template
void TransportSender::send_to_receiver( string diff )
{
uint64_t new_num;
if ( current_state == sent_states.back().state ) { /* previously sent */
new_num = sent_states.back().num;
} else { /* new state */
new_num = sent_states.back().num + 1;
}
/* special case for shutdown sequence */
if ( shutdown_in_progress ) {
new_num = uint64_t( -1 );
}
if ( new_num == sent_states.back().num ) {
sent_states.back().timestamp = timestamp();
} else {
add_sent_state( timestamp(), new_num, current_state );
}
send_in_fragments( diff, new_num ); // Can throw NetworkException
/* successfully sent, probably */
/* ("probably" because the FIRST size-exceeded datagram doesn't get an error) */
assumed_receiver_state = sent_states.end();
assumed_receiver_state--;
next_ack_time = timestamp() + ACK_INTERVAL;
next_send_time = uint64_t(-1);
}
template
void TransportSender::update_assumed_receiver_state( void )
{
uint64_t now = timestamp();
/* start from what is known and give benefit of the doubt to unacknowledged states
transmitted recently enough ago */
assumed_receiver_state = sent_states.begin();
typename list< TimestampedState >::iterator i = sent_states.begin();
i++;
while ( i != sent_states.end() ) {
assert( now >= i->timestamp );
if ( uint64_t(now - i->timestamp) < connection->timeout() + ACK_DELAY ) {
assumed_receiver_state = i;
} else {
return;
}
i++;
}
}
template
void TransportSender::rationalize_states( void )
{
const MyState * known_receiver_state = &sent_states.front().state;
current_state.subtract( known_receiver_state );
for ( typename list< TimestampedState >::reverse_iterator i = sent_states.rbegin();
i != sent_states.rend();
i++ ) {
i->state.subtract( known_receiver_state );
}
}
template
const string TransportSender::make_chaff( void )
{
const size_t CHAFF_MAX = 16;
const size_t chaff_len = prng.uint8() % (CHAFF_MAX + 1);
char chaff[ CHAFF_MAX ];
prng.fill( chaff, chaff_len );
return string( chaff, chaff_len );
}
template
void TransportSender::send_in_fragments( string diff, uint64_t new_num )
{
Instruction inst;
inst.set_protocol_version( MOSH_PROTOCOL_VERSION );
inst.set_old_num( assumed_receiver_state->num );
inst.set_new_num( new_num );
inst.set_ack_num( ack_num );
inst.set_throwaway_num( sent_states.front().num );
inst.set_diff( diff );
inst.set_chaff( make_chaff() );
if ( new_num == uint64_t(-1) ) {
shutdown_tries++;
}
vector fragments = fragmenter.make_fragments( inst, connection->get_MTU() );
for ( vector::iterator i = fragments.begin();
i != fragments.end();
i++ ) {
connection->send( i->tostring() );
if ( verbose ) {
fprintf( stderr, "[%u] Sent [%d=>%d] id %d, frag %d ack=%d, throwaway=%d, len=%d, frame rate=%.2f, timeout=%d, srtt=%.1f\n",
(unsigned int)(timestamp() % 100000), (int)inst.old_num(), (int)inst.new_num(), (int)i->id, (int)i->fragment_num,
(int)inst.ack_num(), (int)inst.throwaway_num(), (int)i->contents.size(),
1000.0 / (double)send_interval(),
(int)connection->timeout(), connection->get_SRTT() );
}
}
pending_data_ack = false;
}
template
void TransportSender::process_acknowledgment_through( uint64_t ack_num )
{
/* Ignore ack if we have culled the state it's acknowledging */
if ( sent_states.end() !=
find_if( sent_states.begin(), sent_states.end(),
bind2nd( mem_fun_ref( &TimestampedState::num_eq ), ack_num ) ) ) {
sent_states.remove_if( bind2nd( mem_fun_ref( &TimestampedState::num_lt ), ack_num ) );
}
assert( !sent_states.empty() );
}
/* give up on getting acknowledgement for shutdown */
template
bool TransportSender::shutdown_ack_timed_out( void ) const
{
return shutdown_tries >= SHUTDOWN_RETRIES;
}
/* Executed upon entry to new receiver state */
template
void TransportSender::set_ack_num( uint64_t s_ack_num )
{
ack_num = s_ack_num;
}