gnunet-service-transport_neighbours.c | gnunet-service-transport_neighbours.c | |||
---|---|---|---|---|
/* | /* | |||
This file is part of GNUnet. | This file is part of GNUnet. | |||
(C) 2010,2011 Christian Grothoff (and other contributing authors) | (C) 2010,2011,2012 Christian Grothoff (and other contributing authors) | |||
GNUnet is free software; you can redistribute it and/or modify | GNUnet is free software; you can redistribute it and/or modify | |||
it under the terms of the GNU General Public License as published | it under the terms of the GNU General Public License as published | |||
by the Free Software Foundation; either version 3, or (at your | by the Free Software Foundation; either version 3, or (at your | |||
option) any later version. | option) any later version. | |||
GNUnet is distributed in the hope that it will be useful, but | GNUnet is distributed in the hope that it will be useful, but | |||
WITHOUT ANY WARRANTY; without even the implied warranty of | WITHOUT ANY WARRANTY; without even the implied warranty of | |||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU | |||
General Public License for more details. | General Public License for more details. | |||
skipping to change at line 25 | skipping to change at line 25 | |||
You should have received a copy of the GNU General Public License | You should have received a copy of the GNU General Public License | |||
along with GNUnet; see the file COPYING. If not, write to the | along with GNUnet; see the file COPYING. If not, write to the | |||
Free Software Foundation, Inc., 59 Temple Place - Suite 330, | Free Software Foundation, Inc., 59 Temple Place - Suite 330, | |||
Boston, MA 02111-1307, USA. | Boston, MA 02111-1307, USA. | |||
*/ | */ | |||
/** | /** | |||
* @file transport/gnunet-service-transport_neighbours.c | * @file transport/gnunet-service-transport_neighbours.c | |||
* @brief neighbour management | * @brief neighbour management | |||
* @author Christian Grothoff | * @author Christian Grothoff | |||
* | ||||
* TODO: | ||||
* - "address_change_cb" is NEVER invoked; when should we call this one exa | ||||
ctly? | ||||
* - TEST, TEST, TEST... | ||||
*/ | */ | |||
#include "platform.h" | #include "platform.h" | |||
#include "gnunet_ats_service.h" | #include "gnunet_ats_service.h" | |||
#include "gnunet-service-transport_neighbours.h" | #include "gnunet-service-transport_neighbours.h" | |||
#include "gnunet-service-transport_plugins.h" | #include "gnunet-service-transport_plugins.h" | |||
#include "gnunet-service-transport_validation.h" | #include "gnunet-service-transport_validation.h" | |||
#include "gnunet-service-transport_clients.h" | #include "gnunet-service-transport_clients.h" | |||
#include "gnunet-service-transport.h" | #include "gnunet-service-transport.h" | |||
#include "gnunet_peerinfo_service.h" | #include "gnunet_peerinfo_service.h" | |||
#include "gnunet-service-transport_blacklist.h" | #include "gnunet-service-transport_blacklist.h" | |||
#include "gnunet_constants.h" | #include "gnunet_constants.h" | |||
#include "transport.h" | #include "transport.h" | |||
/** | /** | |||
* Size of the neighbour hash map. | * Size of the neighbour hash map. | |||
*/ | */ | |||
#define NEIGHBOUR_TABLE_SIZE 256 | #define NEIGHBOUR_TABLE_SIZE 256 | |||
/** | /** | |||
* Time we give plugin to transmit DISCONNECT message before the | ||||
* neighbour entry self-destructs. | ||||
*/ | ||||
#define DISCONNECT_SENT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_ | ||||
UNIT_MILLISECONDS, 100) | ||||
/** | ||||
* How often must a peer violate bandwidth quotas before we start | * How often must a peer violate bandwidth quotas before we start | |||
* to simply drop its messages? | * to simply drop its messages? | |||
*/ | */ | |||
#define QUOTA_VIOLATION_DROP_THRESHOLD 10 | #define QUOTA_VIOLATION_DROP_THRESHOLD 10 | |||
/** | /** | |||
* How often do we send KEEPALIVE messages to each of our neighbours and me asure | * How often do we send KEEPALIVE messages to each of our neighbours and me asure | |||
* the latency with this neighbour? | * the latency with this neighbour? | |||
* (idle timeout is 5 minutes or 300 seconds, so with 30s interval we | * (idle timeout is 5 minutes or 300 seconds, so with 100s interval we | |||
* send 10 keepalives in each interval, so 10 messages would need to be | * send 3 keepalives in each interval, so 3 messages would need to be | |||
* lost in a row for a disconnect). | * lost in a row for a disconnect). | |||
*/ | */ | |||
#define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT _SECONDS, 30) | #define KEEPALIVE_FREQUENCY GNUNET_TIME_relative_multiply (GNUNET_TIME_UNIT _SECONDS, 100) | |||
#define ATS_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNI | /** | |||
T_SECONDS, 5) | * How long are we willing to wait for a response from ATS before timing ou | |||
t? | ||||
#define FAST_RECONNECT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_U | */ | |||
NIT_SECONDS, 1) | #define ATS_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_UNI | |||
T_MILLISECONDS, 500) | ||||
/** | ||||
* How long are we willing to wait for an ACK from the other peer before | ||||
* giving up on our connect operation? | ||||
*/ | ||||
#define SETUP_CONNECTION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME _UNIT_SECONDS, 15) | #define SETUP_CONNECTION_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME _UNIT_SECONDS, 15) | |||
#define TEST_NEW_CODE GNUNET_NO | /** | |||
* How long are we willing to wait for a successful reconnect if | ||||
* an existing connection went down? Much shorter than the | ||||
* usual SETUP_CONNECTION_TIMEOUT as we do not inform the | ||||
* higher layers about the disconnect during this period. | ||||
*/ | ||||
#define FAST_RECONNECT_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TIME_U | ||||
NIT_SECONDS, 1) | ||||
/** | /** | |||
* Entry in neighbours. | * How long are we willing to wait for a response from the blacklist | |||
* subsystem before timing out? | ||||
*/ | */ | |||
struct NeighbourMapEntry; | #define BLACKLIST_RESPONSE_TIMEOUT GNUNET_TIME_relative_multiply (GNUNET_TI ME_UNIT_MILLISECONDS, 500) | |||
GNUNET_NETWORK_STRUCT_BEGIN | GNUNET_NETWORK_STRUCT_BEGIN | |||
/** | /** | |||
* Message a peer sends to another to indicate its | * Message a peer sends to another to indicate that it intends to | |||
* preference for communicating via a particular | * setup a connection/session for data exchange. A 'SESSION_CONNECT' | |||
* session (and the desire to establish a real | * should be answered with a 'SESSION_CONNECT_ACK' with the same body | |||
* connection). | * to confirm. A 'SESSION_CONNECT_ACK' should then be followed with | |||
* a 'SESSION_ACK'. Once the 'SESSION_ACK' is received, both peers | ||||
* should be connected. | ||||
*/ | */ | |||
struct SessionConnectMessage | struct SessionConnectMessage | |||
{ | { | |||
/** | /** | |||
* Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT' | * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT' | |||
* or 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK' | ||||
*/ | */ | |||
struct GNUNET_MessageHeader header; | struct GNUNET_MessageHeader header; | |||
/** | /** | |||
* Always zero. | * Always zero. | |||
*/ | */ | |||
uint32_t reserved GNUNET_PACKED; | uint32_t reserved GNUNET_PACKED; | |||
/** | /** | |||
* Absolute time at the sender. Only the most recent connect | * Absolute time at the sender. Only the most recent connect | |||
* message implies which session is preferred by the sender. | * message implies which session is preferred by the sender. | |||
*/ | */ | |||
struct GNUNET_TIME_AbsoluteNBO timestamp; | struct GNUNET_TIME_AbsoluteNBO timestamp; | |||
}; | }; | |||
/** | ||||
* Message we send to the other peer to notify him that we intentionally | ||||
* are disconnecting (to reduce timeouts). This is just a friendly | ||||
* notification, peers must not rely on always receiving disconnect | ||||
* messages. | ||||
*/ | ||||
struct SessionDisconnectMessage | struct SessionDisconnectMessage | |||
{ | { | |||
/** | /** | |||
* Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT' | * Header of type 'GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT' | |||
*/ | */ | |||
struct GNUNET_MessageHeader header; | struct GNUNET_MessageHeader header; | |||
/** | /** | |||
* Always zero. | * Always zero. | |||
*/ | */ | |||
skipping to change at line 136 | skipping to change at line 167 | |||
struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded public_key; | struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded public_key; | |||
/** | /** | |||
* Signature of the peer that sends us the disconnect. Only | * Signature of the peer that sends us the disconnect. Only | |||
* valid if the timestamp is AFTER the timestamp from the | * valid if the timestamp is AFTER the timestamp from the | |||
* corresponding 'CONNECT' message. | * corresponding 'CONNECT' message. | |||
*/ | */ | |||
struct GNUNET_CRYPTO_RsaSignature signature; | struct GNUNET_CRYPTO_RsaSignature signature; | |||
}; | }; | |||
GNUNET_NETWORK_STRUCT_END | GNUNET_NETWORK_STRUCT_END | |||
/** | /** | |||
* For each neighbour we keep a list of messages | * For each neighbour we keep a list of messages | |||
* that we still want to transmit to the neighbour. | * that we still want to transmit to the neighbour. | |||
*/ | */ | |||
struct MessageQueue | struct MessageQueue | |||
{ | { | |||
/** | /** | |||
* This is a doubly linked list. | * This is a doubly linked list. | |||
*/ | */ | |||
struct MessageQueue *next; | struct MessageQueue *next; | |||
/** | /** | |||
* This is a doubly linked list. | * This is a doubly linked list. | |||
*/ | */ | |||
struct MessageQueue *prev; | struct MessageQueue *prev; | |||
/** | /** | |||
* Once this message is actively being transmitted, which | ||||
* neighbour is it associated with? | ||||
*/ | ||||
struct NeighbourMapEntry *n; | ||||
/** | ||||
* Function to call once we're done. | * Function to call once we're done. | |||
*/ | */ | |||
GST_NeighbourSendContinuation cont; | GST_NeighbourSendContinuation cont; | |||
/** | /** | |||
* Closure for 'cont' | * Closure for 'cont' | |||
*/ | */ | |||
void *cont_cls; | void *cont_cls; | |||
/** | /** | |||
skipping to change at line 189 | skipping to change at line 215 | |||
*/ | */ | |||
size_t message_buf_size; | size_t message_buf_size; | |||
/** | /** | |||
* At what time should we fail? | * At what time should we fail? | |||
*/ | */ | |||
struct GNUNET_TIME_Absolute timeout; | struct GNUNET_TIME_Absolute timeout; | |||
}; | }; | |||
/** | ||||
* Possible state of a neighbour. Initially, we are S_NOT_CONNECTED. | ||||
* | ||||
* Then, there are two main paths. If we receive a CONNECT message, we | ||||
* first run a check against the blacklist and ask ATS for a | ||||
* suggestion. (S_CONNECT_RECV_ATS). If the blacklist comes back | ||||
* positive, we give the address to ATS. If ATS makes a suggestion, | ||||
* we ALSO give that suggestion to the blacklist | ||||
* (S_CONNECT_RECV_BLACKLIST). Once the blacklist approves the | ||||
* address we got from ATS, we send our CONNECT_ACK and go to | ||||
* S_CONNECT_RECV_ACK. If we receive a SESSION_ACK, we go to | ||||
* S_CONNECTED (and notify everyone about the new connection). If the | ||||
* operation times out, we go to S_DISCONNECT. | ||||
* | ||||
* The other case is where we transmit a CONNECT message first. We | ||||
* start with S_INIT_ATS. If we get an address, we enter | ||||
* S_INIT_BLACKLIST and check the blacklist. If the blacklist is OK | ||||
* with the connection, we actually send the CONNECT message and go to | ||||
* state S_CONNECT_SENT. Once we receive a CONNECT_ACK, we go to | ||||
* S_CONNECTED (and notify everyone about the new connection and send | ||||
* back a SESSION_ACK). If the operation times out, we go to | ||||
* S_DISCONNECT. | ||||
* | ||||
* If the session is in trouble (i.e. transport-level disconnect or | ||||
* timeout), we go to S_RECONNECT_ATS where we ask ATS for a new | ||||
* address (we don't notify anyone about the disconnect yet). Once we | ||||
* have a new address, we go to S_RECONNECT_BLACKLIST to check the new | ||||
* address against the blacklist. If the blacklist approves, we enter | ||||
* S_RECONNECT_SENT and send a CONNECT message. If we receive a | ||||
* CONNECT_ACK, we go to S_CONNECTED and nobody noticed that we had | ||||
* trouble; we also send a SESSION_ACK at this time just in case. If | ||||
* the operation times out, we go to S_DISCONNECT (and notify everyone | ||||
* about the lost connection). | ||||
* | ||||
* If ATS decides to switch addresses while we have a normal | ||||
* connection, we go to S_CONNECTED_SWITCHING_BLACKLIST to check the | ||||
* new address against the blacklist. If the blacklist approves, we | ||||
* go to S_CONNECTED_SWITCHING_CONNECT_SENT and send a | ||||
* SESSION_CONNECT. If we get a SESSION_ACK back, we switch the | ||||
* primary connection to the suggested alternative from ATS, go back | ||||
* to S_CONNECTED and send a SESSION_ACK to the other peer just to be | ||||
* sure. If the operation times out (or the blacklist disapproves), | ||||
* we go to S_CONNECTED (and notify ATS that the given alternative | ||||
* address is "invalid"). | ||||
* | ||||
* Once a session is in S_DISCONNECT, it is cleaned up and then goes | ||||
* to (S_DISCONNECT_FINISHED). If we receive an explicit disconnect | ||||
* request, we can go from any state to S_DISCONNECT, possibly after | ||||
* generating disconnect notifications. | ||||
* | ||||
* Note that it is quite possible that while we are in any of these | ||||
* states, we could receive a 'CONNECT' request from the other peer. | ||||
* We then enter a 'weird' state where we pursue our own primary state | ||||
* machine (as described above), but with the 'send_connect_ack' flag | ||||
* set to 1. If our state machine allows us to send a 'CONNECT_ACK' | ||||
* (because we have an acceptable address), we send the 'CONNECT_ACK' | ||||
* and set the 'send_connect_ack' to 2. If we then receive a | ||||
* 'SESSION_ACK', we go to 'S_CONNECTED' (and reset 'send_connect_ack' | ||||
* to 0). | ||||
* | ||||
*/ | ||||
enum State | enum State | |||
{ | { | |||
/** | /** | |||
* fresh peer or completely disconnected | * fresh peer or completely disconnected | |||
*/ | */ | |||
S_NOT_CONNECTED, | S_NOT_CONNECTED = 0, | |||
/** | ||||
* Asked to initiate connection, trying to get address from ATS | ||||
*/ | ||||
S_INIT_ATS, | ||||
/** | /** | |||
* sent CONNECT message to other peer, waiting for CONNECT_ACK | * Asked to initiate connection, trying to get address approved | |||
* by blacklist. | ||||
*/ | ||||
S_INIT_BLACKLIST, | ||||
/** | ||||
* Sent CONNECT message to other peer, waiting for CONNECT_ACK | ||||
*/ | */ | |||
S_CONNECT_SENT, | S_CONNECT_SENT, | |||
/** | /** | |||
* received CONNECT message to other peer, sending CONNECT_ACK | * Received a CONNECT, asking ATS about address suggestions. | |||
*/ | ||||
S_CONNECT_RECV_ATS, | ||||
/** | ||||
* Received CONNECT from other peer, got an address, checking with blackl | ||||
ist. | ||||
*/ | ||||
S_CONNECT_RECV_BLACKLIST, | ||||
/** | ||||
* CONNECT request from other peer was SESSION_ACK'ed, waiting for | ||||
* SESSION_ACK. | ||||
*/ | */ | |||
S_CONNECT_RECV, | S_CONNECT_RECV_ACK, | |||
/** | /** | |||
* received ACK or payload | * Got our CONNECT_ACK/SESSION_ACK, connection is up. | |||
*/ | */ | |||
S_CONNECTED, | S_CONNECTED, | |||
/** | /** | |||
* connection ended, fast reconnect | * Connection got into trouble, rest of the system still believes | |||
* it to be up, but we're getting a new address from ATS. | ||||
*/ | ||||
S_RECONNECT_ATS, | ||||
/** | ||||
* Connection got into trouble, rest of the system still believes | ||||
* it to be up; we are checking the new address against the blacklist. | ||||
*/ | ||||
S_RECONNECT_BLACKLIST, | ||||
/** | ||||
* Sent CONNECT over new address (either by ATS telling us to switch | ||||
* addresses or from RECONNECT_ATS); if this fails, we need to tell | ||||
* the rest of the system about a disconnect. | ||||
*/ | ||||
S_RECONNECT_SENT, | ||||
/** | ||||
* We have some primary connection, but ATS suggested we switch | ||||
* to some alternative; we're now checking the alternative against | ||||
* the blacklist. | ||||
*/ | ||||
S_CONNECTED_SWITCHING_BLACKLIST, | ||||
/** | ||||
* We have some primary connection, but ATS suggested we switch | ||||
* to some alternative; we now sent a CONNECT message for the | ||||
* alternative session to the other peer and waiting for a | ||||
* CONNECT_ACK to make this our primary connection. | ||||
*/ | ||||
S_CONNECTED_SWITCHING_CONNECT_SENT, | ||||
/** | ||||
* Disconnect in progress (we're sending the DISCONNECT message to the | ||||
* other peer; after that is finished, the state will be cleaned up). | ||||
*/ | */ | |||
S_FAST_RECONNECT, | S_DISCONNECT, | |||
/** | /** | |||
* Disconnect in progress | * We're finished with the disconnect; clean up state now! | |||
*/ | */ | |||
S_DISCONNECT | S_DISCONNECT_FINISHED | |||
}; | }; | |||
enum Address_State | /** | |||
* A possible address we could use to communicate with a neighbour. | ||||
*/ | ||||
struct NeighbourAddress | ||||
{ | { | |||
USED, | ||||
UNUSED, | /** | |||
FRESH, | * Active session for this address. | |||
*/ | ||||
struct Session *session; | ||||
/** | ||||
* Network-level address information. | ||||
*/ | ||||
struct GNUNET_HELLO_Address *address; | ||||
/** | ||||
* Timestamp of the 'SESSION_CONNECT' message we sent to the other | ||||
* peer for this address. Use to check that the ACK is in response | ||||
* to our most recent 'CONNECT'. | ||||
*/ | ||||
struct GNUNET_TIME_Absolute connect_timestamp; | ||||
/** | ||||
* Inbound bandwidth from ATS for this address. | ||||
*/ | ||||
struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in; | ||||
/** | ||||
* Outbound bandwidth from ATS for this address. | ||||
*/ | ||||
struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out; | ||||
/** | ||||
* Did we tell ATS that this is our 'active' address? | ||||
*/ | ||||
int ats_active; | ||||
}; | }; | |||
/** | /** | |||
* Entry in neighbours. | * Entry in neighbours. | |||
*/ | */ | |||
struct NeighbourMapEntry | struct NeighbourMapEntry | |||
{ | { | |||
/** | /** | |||
* Head of list of messages we would like to send to this peer; | * Head of list of messages we would like to send to this peer; | |||
skipping to change at line 253 | skipping to change at line 430 | |||
* contain at most one message per client. | * contain at most one message per client. | |||
*/ | */ | |||
struct MessageQueue *messages_tail; | struct MessageQueue *messages_tail; | |||
/** | /** | |||
* Are we currently trying to send a message? If so, which one? | * Are we currently trying to send a message? If so, which one? | |||
*/ | */ | |||
struct MessageQueue *is_active; | struct MessageQueue *is_active; | |||
/** | /** | |||
* Active session for communicating with the peer. | * Primary address we currently use to communicate with the neighbour. | |||
*/ | */ | |||
struct Session *session; | struct NeighbourAddress primary_address; | |||
/** | /** | |||
* Address we currently use. | * Alternative address currently under consideration for communicating | |||
* with the neighbour. | ||||
*/ | */ | |||
struct GNUNET_HELLO_Address *address; | struct NeighbourAddress alternative_address; | |||
/** | /** | |||
* Identity of this neighbour. | * Identity of this neighbour. | |||
*/ | */ | |||
struct GNUNET_PeerIdentity id; | struct GNUNET_PeerIdentity id; | |||
/** | /** | |||
* ID of task scheduled to run when this peer is about to | * Main task that drives this peer (timeouts, keepalives, etc.). | |||
* time out (will free resources associated with the peer). | * Always runs the 'master_task'. | |||
*/ | ||||
GNUNET_SCHEDULER_TaskIdentifier task; | ||||
/** | ||||
* At what time should we sent the next keep-alive message? | ||||
*/ | ||||
struct GNUNET_TIME_Absolute keep_alive_time; | ||||
/** | ||||
* At what time did we sent the last keep-alive message? Used | ||||
* to calculate round-trip time ("latency"). | ||||
*/ | ||||
struct GNUNET_TIME_Absolute last_keep_alive_time; | ||||
/** | ||||
* Timestamp we should include in our next CONNECT_ACK message. | ||||
* (only valid if 'send_connect_ack' is GNUNET_YES). Used to build | ||||
* our CONNECT_ACK message. | ||||
*/ | */ | |||
GNUNET_SCHEDULER_TaskIdentifier timeout_task; | struct GNUNET_TIME_Absolute connect_ack_timestamp; | |||
/** | /** | |||
* ID of task scheduled to send keepalives. | * Time where we should cut the connection (timeout) if we don't | |||
* make progress in the state machine (or get a KEEPALIVE_RESPONSE | ||||
* if we are in S_CONNECTED). | ||||
*/ | */ | |||
GNUNET_SCHEDULER_TaskIdentifier keepalive_task; | struct GNUNET_TIME_Absolute timeout; | |||
/** | /** | |||
* ID of task scheduled to run when we should try transmitting | * Latest calculated latency value | |||
* the head of the message queue. | ||||
*/ | */ | |||
GNUNET_SCHEDULER_TaskIdentifier transmission_task; | struct GNUNET_TIME_Relative latency; | |||
/** | /** | |||
* Tracker for inbound bandwidth. | * Tracker for inbound bandwidth. | |||
*/ | */ | |||
struct GNUNET_BANDWIDTH_Tracker in_tracker; | struct GNUNET_BANDWIDTH_Tracker in_tracker; | |||
/** | /** | |||
* Inbound bandwidth from ATS, activated when connection is up | * How often has the other peer (recently) violated the inbound | |||
* traffic limit? Incremented by 10 per violation, decremented by 1 | ||||
* per non-violation (for each time interval). | ||||
*/ | */ | |||
struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in; | unsigned int quota_violation_count; | |||
/** | /** | |||
* Inbound bandwidth from ATS, activated when connection is up | * The current state of the peer. | |||
*/ | */ | |||
struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out; | enum State state; | |||
/** | /** | |||
* Timestamp of the 'SESSION_CONNECT' message we got from the other peer | * Did we sent an KEEP_ALIVE message and are we expecting a response? | |||
*/ | */ | |||
struct GNUNET_TIME_Absolute connect_ts; | int expect_latency_response; | |||
/** | /** | |||
* When did we sent the last keep-alive message? | * Flag to set if we still need to send a CONNECT_ACK message to the othe | |||
r peer | ||||
* (once we have an address to use and the peer has been allowed by our | ||||
* blacklist). Set to 1 if we need to send a CONNECT_ACK. Set to 2 if w | ||||
e | ||||
* did send a CONNECT_ACK and should go to 'S_CONNECTED' upon receiving | ||||
* a 'SESSION_ACK' (regardless of what our own state machine might say). | ||||
*/ | */ | |||
struct GNUNET_TIME_Absolute keep_alive_sent; | int send_connect_ack; | |||
}; | ||||
/** | ||||
* Context for blacklist checks and the 'handle_test_blacklist_cont' | ||||
* function. Stores information about ongoing blacklist checks. | ||||
*/ | ||||
struct BlackListCheckContext | ||||
{ | ||||
/** | /** | |||
* Latest calculated latency value | * We keep blacklist checks in a DLL. | |||
*/ | */ | |||
struct GNUNET_TIME_Relative latency; | struct BlackListCheckContext *next; | |||
/** | /** | |||
* Timeout for ATS | * We keep blacklist checks in a DLL. | |||
* We asked ATS for a new address for this peer | ||||
*/ | */ | |||
GNUNET_SCHEDULER_TaskIdentifier ats_suggest; | struct BlackListCheckContext *prev; | |||
/** | /** | |||
* Task the resets the peer state after due to an pending | * Address that is being checked. | |||
* unsuccessful connection setup | ||||
*/ | */ | |||
GNUNET_SCHEDULER_TaskIdentifier state_reset; | struct NeighbourAddress na; | |||
/** | /** | |||
* How often has the other peer (recently) violated the inbound | * ATS information about the address. | |||
* traffic limit? Incremented by 10 per violation, decremented by 1 | ||||
* per non-violation (for each time interval). | ||||
*/ | */ | |||
unsigned int quota_violation_count; | struct GNUNET_ATS_Information *ats; | |||
/** | /** | |||
* The current state of the peer | * Handle to the ongoing blacklist check. | |||
* Element of enum State | ||||
*/ | */ | |||
int state; | struct GST_BlacklistCheck *bc; | |||
/** | /** | |||
* Did we sent an KEEP_ALIVE message and are we expecting a response? | * Size of the 'ats' array. | |||
*/ | */ | |||
int expect_latency_response; | uint32_t ats_count; | |||
int address_state; | ||||
}; | }; | |||
/** | /** | |||
* All known neighbours and their HELLOs. | * Hash map from peer identities to the respective 'struct NeighbourMapEntr y'. | |||
*/ | */ | |||
static struct GNUNET_CONTAINER_MultiHashMap *neighbours; | static struct GNUNET_CONTAINER_MultiHashMap *neighbours; | |||
/** | /** | |||
* We keep blacklist checks in a DLL so that we can find | ||||
* the 'sessions' in their 'struct NeighbourAddress' if | ||||
* a session goes down. | ||||
*/ | ||||
static struct BlackListCheckContext *bc_head; | ||||
/** | ||||
* We keep blacklist checks in a DLL. | ||||
*/ | ||||
static struct BlackListCheckContext *bc_tail; | ||||
/** | ||||
* Closure for connect_notify_cb, disconnect_notify_cb and address_change_c b | * Closure for connect_notify_cb, disconnect_notify_cb and address_change_c b | |||
*/ | */ | |||
static void *callback_cls; | static void *callback_cls; | |||
/** | /** | |||
* Function to call when we connected to a neighbour. | * Function to call when we connected to a neighbour. | |||
*/ | */ | |||
static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb; | static GNUNET_TRANSPORT_NotifyConnect connect_notify_cb; | |||
/** | /** | |||
skipping to change at line 374 | skipping to change at line 593 | |||
static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb; | static GNUNET_TRANSPORT_NotifyDisconnect disconnect_notify_cb; | |||
/** | /** | |||
* Function to call when we changed an active address of a neighbour. | * Function to call when we changed an active address of a neighbour. | |||
*/ | */ | |||
static GNUNET_TRANSPORT_PeerIterateCallback address_change_cb; | static GNUNET_TRANSPORT_PeerIterateCallback address_change_cb; | |||
/** | /** | |||
* counter for connected neighbours | * counter for connected neighbours | |||
*/ | */ | |||
static int neighbours_connected; | static unsigned int neighbours_connected; | |||
/** | ||||
* Number of bytes we have currently queued for transmission. | ||||
*/ | ||||
static unsigned long long bytes_in_send_queue; | ||||
/** | /** | |||
* Lookup a neighbour entry in the neighbours hash map. | * Lookup a neighbour entry in the neighbours hash map. | |||
* | * | |||
* @param pid identity of the peer to look up | * @param pid identity of the peer to look up | |||
* @return the entry, NULL if there is no existing record | * @return the entry, NULL if there is no existing record | |||
*/ | */ | |||
static struct NeighbourMapEntry * | static struct NeighbourMapEntry * | |||
lookup_neighbour (const struct GNUNET_PeerIdentity *pid) | lookup_neighbour (const struct GNUNET_PeerIdentity *pid) | |||
{ | { | |||
if (NULL == neighbours) | ||||
return NULL; | ||||
return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey); | return GNUNET_CONTAINER_multihashmap_get (neighbours, &pid->hashPubKey); | |||
} | } | |||
/** | ||||
* Disconnect from the given neighbour, clean up the record. | ||||
* | ||||
* @param n neighbour to disconnect from | ||||
*/ | ||||
static void | ||||
disconnect_neighbour (struct NeighbourMapEntry *n); | ||||
#define change_state(n, state, ...) change (n, state, __LINE__) | ||||
static int | ||||
is_connecting (struct NeighbourMapEntry *n) | ||||
{ | ||||
if ((n->state > S_NOT_CONNECTED) && (n->state < S_CONNECTED)) | ||||
return GNUNET_YES; | ||||
return GNUNET_NO; | ||||
} | ||||
static int | ||||
is_connected (struct NeighbourMapEntry *n) | ||||
{ | ||||
if (n->state == S_CONNECTED) | ||||
return GNUNET_YES; | ||||
return GNUNET_NO; | ||||
} | ||||
static int | ||||
is_disconnecting (struct NeighbourMapEntry *n) | ||||
{ | ||||
if (n->state == S_DISCONNECT) | ||||
return GNUNET_YES; | ||||
return GNUNET_NO; | ||||
} | ||||
static const char * | static const char * | |||
print_state (int state) | print_state (int state) | |||
{ | { | |||
switch (state) | switch (state) | |||
{ | { | |||
case S_CONNECTED: | case S_NOT_CONNECTED: | |||
return "S_CONNECTED"; | return "S_NOT_CONNECTED"; | |||
break; | ||||
case S_INIT_ATS: | ||||
return "S_INIT_ATS"; | ||||
break; | break; | |||
case S_CONNECT_RECV: | case S_INIT_BLACKLIST: | |||
return "S_CONNECT_RECV"; | return "S_INIT_BLACKLIST"; | |||
break; | break; | |||
case S_CONNECT_SENT: | case S_CONNECT_SENT: | |||
return "S_CONNECT_SENT"; | return "S_CONNECT_SENT"; | |||
break; | break; | |||
case S_CONNECT_RECV_ATS: | ||||
return "S_CONNECT_RECV_ATS"; | ||||
break; | ||||
case S_CONNECT_RECV_BLACKLIST: | ||||
return "S_CONNECT_RECV_BLACKLIST"; | ||||
break; | ||||
case S_CONNECT_RECV_ACK: | ||||
return "S_CONNECT_RECV_ACK"; | ||||
break; | ||||
case S_CONNECTED: | ||||
return "S_CONNECTED"; | ||||
break; | ||||
case S_RECONNECT_ATS: | ||||
return "S_RECONNECT_ATS"; | ||||
break; | ||||
case S_RECONNECT_BLACKLIST: | ||||
return "S_RECONNECT_BLACKLIST"; | ||||
break; | ||||
case S_RECONNECT_SENT: | ||||
return "S_RECONNECT_SENT"; | ||||
break; | ||||
case S_CONNECTED_SWITCHING_BLACKLIST: | ||||
return "S_CONNECTED_SWITCHING_BLACKLIST"; | ||||
break; | ||||
case S_CONNECTED_SWITCHING_CONNECT_SENT: | ||||
return "S_CONNECTED_SWITCHING_CONNECT_SENT"; | ||||
break; | ||||
case S_DISCONNECT: | case S_DISCONNECT: | |||
return "S_DISCONNECT"; | return "S_DISCONNECT"; | |||
break; | break; | |||
case S_NOT_CONNECTED: | case S_DISCONNECT_FINISHED: | |||
return "S_NOT_CONNECTED"; | return "S_DISCONNECT_FINISHED"; | |||
break; | ||||
case S_FAST_RECONNECT: | ||||
return "S_FAST_RECONNECT"; | ||||
break; | break; | |||
default: | default: | |||
return "UNDEFINED"; | ||||
GNUNET_break (0); | GNUNET_break (0); | |||
break; | break; | |||
} | } | |||
return NULL; | GNUNET_break (0); | |||
} | return "UNDEFINED"; | |||
static int | ||||
change (struct NeighbourMapEntry *n, int state, int line); | ||||
static void | ||||
ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *t | ||||
c); | ||||
static void | ||||
reset_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc) | ||||
{ | ||||
struct NeighbourMapEntry *n = cls; | ||||
if (n == NULL) | ||||
return; | ||||
n->state_reset = GNUNET_SCHEDULER_NO_TASK; | ||||
if (n->state == S_CONNECTED) | ||||
return; | ||||
#if DEBUG_TRANSPORT | ||||
GNUNET_STATISTICS_update (GST_stats, | ||||
gettext_noop | ||||
("# failed connection attempts due to timeout") | ||||
, 1, | ||||
GNUNET_NO); | ||||
#endif | ||||
/* resetting state */ | ||||
if (n->state == S_FAST_RECONNECT) | ||||
{ | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Fast reconnect time out, disconnecting peer `%s'\n", | ||||
GNUNET_i2s (&n->id)); | ||||
disconnect_neighbour(n); | ||||
return; | ||||
} | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"State for neighbour `%s' %X changed from `%s' to `%s' in lin | ||||
e %u\n", | ||||
GNUNET_i2s (&n->id), n, print_state(n->state), "S_NOT_CONNECT | ||||
ED", __LINE__); | ||||
n->state = S_NOT_CONNECTED; | ||||
/* destroying address */ | ||||
if (n->address != NULL) | ||||
{ | ||||
GNUNET_assert (strlen (n->address->transport_name) > 0); | ||||
GNUNET_ATS_address_destroyed (GST_ats, n->address, n->session); | ||||
} | ||||
/* request new address */ | ||||
if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK) | ||||
GNUNET_SCHEDULER_cancel (n->ats_suggest); | ||||
n->ats_suggest = | ||||
GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cance | ||||
l, | ||||
n); | ||||
GNUNET_ATS_suggest_address (GST_ats, &n->id); | ||||
} | } | |||
/** | ||||
* Test if we're connected to the given peer. | ||||
* | ||||
* @param n neighbour entry of peer to test | ||||
* @return GNUNET_YES if we are connected, GNUNET_NO if not | ||||
*/ | ||||
static int | static int | |||
change (struct NeighbourMapEntry *n, int state, int line) | test_connected (struct NeighbourMapEntry *n) | |||
{ | { | |||
int previous_state; | if (NULL == n) | |||
/* allowed transitions */ | return GNUNET_NO; | |||
int allowed = GNUNET_NO; | ||||
previous_state = n->state; | ||||
switch (n->state) | switch (n->state) | |||
{ | { | |||
case S_NOT_CONNECTED: | case S_NOT_CONNECTED: | |||
if ((state == S_CONNECT_RECV) || (state == S_CONNECT_SENT) || | case S_INIT_ATS: | |||
(state == S_DISCONNECT)) | case S_INIT_BLACKLIST: | |||
allowed = GNUNET_YES; | ||||
break; | ||||
case S_CONNECT_RECV: | ||||
allowed = GNUNET_YES; | ||||
break; | ||||
case S_CONNECT_SENT: | case S_CONNECT_SENT: | |||
allowed = GNUNET_YES; | case S_CONNECT_RECV_ATS: | |||
break; | case S_CONNECT_RECV_BLACKLIST: | |||
case S_CONNECT_RECV_ACK: | ||||
return GNUNET_NO; | ||||
case S_CONNECTED: | case S_CONNECTED: | |||
if ((state == S_DISCONNECT) || (state == S_FAST_RECONNECT)) | case S_RECONNECT_ATS: | |||
allowed = GNUNET_YES; | case S_RECONNECT_BLACKLIST: | |||
break; | case S_RECONNECT_SENT: | |||
case S_CONNECTED_SWITCHING_BLACKLIST: | ||||
case S_CONNECTED_SWITCHING_CONNECT_SENT: | ||||
return GNUNET_YES; | ||||
case S_DISCONNECT: | case S_DISCONNECT: | |||
break; | case S_DISCONNECT_FINISHED: | |||
case S_FAST_RECONNECT: | return GNUNET_NO; | |||
if ((state == S_CONNECTED) || (state == S_DISCONNECT)) | ||||
allowed = GNUNET_YES; | ||||
break; | ||||
default: | default: | |||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unhandled state `%s' \n",print_st ate (n->state)); | ||||
GNUNET_break (0); | GNUNET_break (0); | |||
break; | break; | |||
} | } | |||
if (allowed == GNUNET_NO) | return GNUNET_SYSERR; | |||
{ | } | |||
char *old = GNUNET_strdup (print_state (n->state)); | ||||
char *new = GNUNET_strdup (print_state (state)); | ||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | /** | |||
"Illegal state transition from `%s' to `%s' in line %u \n", | * Send information about a new outbound quota to our clients. | |||
old, | * | |||
new, line); | * @param target affected peer | |||
GNUNET_break (0); | * @param quota new quota | |||
GNUNET_free (old); | */ | |||
GNUNET_free (new); | static void | |||
return GNUNET_SYSERR; | send_outbound_quota (const struct GNUNET_PeerIdentity *target, | |||
struct GNUNET_BANDWIDTH_Value32NBO quota) | ||||
{ | ||||
struct QuotaSetMessage q_msg; | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Sending outbound quota of %u Bps for peer `%s' to all client | ||||
s\n", | ||||
ntohl (quota.value__), GNUNET_i2s (target)); | ||||
q_msg.header.size = htons (sizeof (struct QuotaSetMessage)); | ||||
q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA); | ||||
q_msg.quota = quota; | ||||
q_msg.peer = (*target); | ||||
GST_clients_broadcast (&q_msg.header, GNUNET_NO); | ||||
} | ||||
/** | ||||
* We don't need a given neighbour address any more. | ||||
* Release its resources and give appropriate notifications | ||||
* to ATS and other subsystems. | ||||
* | ||||
* @param na address we are done with; 'na' itself must NOT be 'free'd, onl | ||||
y the contents! | ||||
*/ | ||||
static void | ||||
free_address (struct NeighbourAddress *na) | ||||
{ | ||||
if (GNUNET_YES == na->ats_active) | ||||
{ | ||||
GST_validation_set_address_use (na->address, na->session, GNUNET_NO, __ | ||||
LINE__); | ||||
GNUNET_ATS_address_in_use (GST_ats, na->address, na->session, GNUNET_NO | ||||
); | ||||
} | } | |||
na->ats_active = GNUNET_NO; | ||||
if (NULL != na->address) | ||||
{ | { | |||
char *old = GNUNET_strdup (print_state (n->state)); | GNUNET_HELLO_address_free (na->address); | |||
char *new = GNUNET_strdup (print_state (state)); | na->address = NULL; | |||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"State for neighbour `%s' %X changed from `%s' to `%s' in l | ||||
ine %u\n", | ||||
GNUNET_i2s (&n->id), n, old, new, line); | ||||
GNUNET_free (old); | ||||
GNUNET_free (new); | ||||
} | } | |||
n->state = state; | na->session = NULL; | |||
} | ||||
switch (n->state) | ||||
{ | ||||
case S_FAST_RECONNECT: | ||||
case S_CONNECT_RECV: | ||||
case S_CONNECT_SENT: | ||||
if (n->state_reset != GNUNET_SCHEDULER_NO_TASK) | ||||
GNUNET_SCHEDULER_cancel (n->state_reset); | ||||
n->state_reset = | ||||
GNUNET_SCHEDULER_add_delayed (SETUP_CONNECTION_TIMEOUT, &reset_task | ||||
, n); | ||||
break; | ||||
case S_CONNECTED: | ||||
case S_NOT_CONNECTED: | ||||
case S_DISCONNECT: | ||||
if (GNUNET_SCHEDULER_NO_TASK != n->state_reset) | ||||
{ | ||||
#if DEBUG_TRANSPORT | ||||
char *old = GNUNET_strdup (print_state (n->state)); | ||||
char *new = GNUNET_strdup (print_state (state)); | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Removed reset task for peer `%s' %s failed in state tran | ||||
sition `%s' -> `%s' \n", | ||||
GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), old, n | ||||
ew); | ||||
GNUNET_free (old); | ||||
GNUNET_free (new); | ||||
#endif | ||||
GNUNET_assert (n->state_reset != GNUNET_SCHEDULER_NO_TASK); | ||||
GNUNET_SCHEDULER_cancel (n->state_reset); | ||||
n->state_reset = GNUNET_SCHEDULER_NO_TASK; | ||||
} | ||||
break; | ||||
default: | ||||
GNUNET_assert (0); | ||||
} | ||||
if (NULL != address_change_cb) | ||||
{ | ||||
if (n->state == S_CONNECTED) | ||||
address_change_cb (callback_cls, &n->id, n->address); | ||||
else if (previous_state == S_CONNECTED) | ||||
address_change_cb (callback_cls, &n->id, NULL); | ||||
} | ||||
return GNUNET_OK; | ||||
} | ||||
static ssize_t | ||||
send_with_session (struct NeighbourMapEntry *n, | ||||
const char *msgbuf, size_t msgbuf_size, | ||||
uint32_t priority, | ||||
struct GNUNET_TIME_Relative timeout, | ||||
GNUNET_TRANSPORT_TransmitContinuation cont, void *cont_c | ||||
ls) | ||||
{ | ||||
struct GNUNET_TRANSPORT_PluginFunctions *papi; | ||||
size_t ret = GNUNET_SYSERR; | ||||
GNUNET_assert (n != NULL); | ||||
GNUNET_assert (n->session != NULL); | ||||
papi = GST_plugins_find (n->address->transport_name); | ||||
if (papi == NULL) | ||||
{ | ||||
if (cont != NULL) | ||||
cont (cont_cls, &n->id, GNUNET_SYSERR); | ||||
return GNUNET_SYSERR; | ||||
} | ||||
ret = papi->send (papi->cls, | ||||
n->session, | ||||
msgbuf, msgbuf_size, | ||||
0, | ||||
timeout, | ||||
cont, cont_cls); | ||||
if ((ret == -1) && (cont != NULL)) | ||||
cont (cont_cls, &n->id, GNUNET_SYSERR); | ||||
return ret; | ||||
} | ||||
/** | ||||
* Task invoked to start a transmission to another peer. | ||||
* | ||||
* @param cls the 'struct NeighbourMapEntry' | ||||
* @param tc scheduler context | ||||
*/ | ||||
static void | ||||
transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc | ||||
); | ||||
/** | /** | |||
* We're done with our transmission attempt, continue processing. | * Initialize the 'struct NeighbourAddress'. | |||
* | * | |||
* @param cls the 'struct MessageQueue' of the message | * @param na neighbour address to initialize | |||
* @param receiver intended receiver | * @param address address of the other peer, NULL if other peer | |||
* @param success whether it worked or not | * connected to us | |||
* @param session session to use (or NULL, in which case an | ||||
* address must be setup) | ||||
* @param bandwidth_in inbound quota to be used when connection is up | ||||
* @param bandwidth_out outbound quota to be used when connection is up | ||||
* @param is_active GNUNET_YES to mark this as the active address with ATS | ||||
*/ | */ | |||
static void | static void | |||
transmit_send_continuation (void *cls, | set_address (struct NeighbourAddress *na, | |||
const struct GNUNET_PeerIdentity *receiver, | const struct GNUNET_HELLO_Address *address, | |||
int success) | struct Session *session, | |||
struct GNUNET_BANDWIDTH_Value32NBO bandwidth_in, | ||||
struct GNUNET_BANDWIDTH_Value32NBO bandwidth_out, | ||||
int is_active) | ||||
{ | { | |||
struct MessageQueue *mq = cls; | struct GNUNET_TRANSPORT_PluginFunctions *papi; | |||
struct NeighbourMapEntry *n; | ||||
struct NeighbourMapEntry *tmp; | ||||
tmp = lookup_neighbour (receiver); | if (NULL == (papi = GST_plugins_find (address->transport_name))) | |||
n = mq->n; | ||||
if ((NULL != n) && (tmp != NULL) && (tmp == n)) | ||||
{ | { | |||
GNUNET_assert (n->is_active == mq); | GNUNET_break (0); | |||
n->is_active = NULL; | return; | |||
if (success == GNUNET_YES) | } | |||
if (session == na->session) | ||||
{ | ||||
na->bandwidth_in = bandwidth_in; | ||||
na->bandwidth_out = bandwidth_out; | ||||
if (is_active != na->ats_active) | ||||
{ | ||||
na->ats_active = is_active; | ||||
GNUNET_ATS_address_in_use (GST_ats, na->address, na->session, is_acti | ||||
ve); | ||||
GST_validation_set_address_use (na->address, na->session, is_active, | ||||
__LINE__); | ||||
} | ||||
if (GNUNET_YES == is_active) | ||||
{ | { | |||
GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK); | /* FIXME: is this the right place to set quotas? */ | |||
n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, | GST_neighbours_set_incoming_quota (&address->peer, bandwidth_in); | |||
n); | send_outbound_quota (&address->peer, bandwidth_out); | |||
} | } | |||
return; | ||||
} | ||||
free_address (na); | ||||
if (NULL == session) | ||||
session = papi->get_session (papi->cls, address); | ||||
if (NULL == session) | ||||
{ | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Failed to obtain new session for peer `%s' and address '%s | ||||
'\n", | ||||
GNUNET_i2s (&address->peer), GST_plugins_a2s (address)); | ||||
GNUNET_ATS_address_destroyed (GST_ats, address, NULL); | ||||
return; | ||||
} | ||||
na->address = GNUNET_HELLO_address_copy (address); | ||||
na->bandwidth_in = bandwidth_in; | ||||
na->bandwidth_out = bandwidth_out; | ||||
na->session = session; | ||||
na->ats_active = is_active; | ||||
if (GNUNET_YES == is_active) | ||||
{ | ||||
/* Telling ATS about new session */ | ||||
GNUNET_ATS_address_update (GST_ats, na->address, na->session, NULL, 0); | ||||
GNUNET_ATS_address_in_use (GST_ats, na->address, na->session, GNUNET_YE | ||||
S); | ||||
GST_validation_set_address_use (na->address, na->session, GNUNET_YES, | ||||
__LINE__); | ||||
/* FIXME: is this the right place to set quotas? */ | ||||
GST_neighbours_set_incoming_quota (&address->peer, bandwidth_in); | ||||
send_outbound_quota (&address->peer, bandwidth_out); | ||||
} | } | |||
#if DEBUG_TRANSPORT | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sending message of type %u was %s\n | ||||
", | ||||
ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->typ | ||||
e), | ||||
(success == GNUNET_OK) ? "successful" : "FAILED"); | ||||
#endif | ||||
if (NULL != mq->cont) | ||||
mq->cont (mq->cont_cls, success); | ||||
GNUNET_free (mq); | ||||
} | } | |||
/** | /** | |||
* Check the ready list for the given neighbour and if a plugin is | * Free a neighbour map entry. | |||
* ready for transmission (and if we have a message), do so! | ||||
* | * | |||
* @param n target peer for which to transmit | * @param n entry to free | |||
* @param keep_sessions GNUNET_NO to tell plugin to terminate sessions, | ||||
* GNUNET_YES to keep all sessions | ||||
*/ | */ | |||
static void | static void | |||
try_transmission_to_peer (struct NeighbourMapEntry *n) | free_neighbour (struct NeighbourMapEntry *n, int keep_sessions) | |||
{ | { | |||
struct MessageQueue *mq; | struct MessageQueue *mq; | |||
struct GNUNET_TIME_Relative timeout; | struct GNUNET_TRANSPORT_PluginFunctions *papi; | |||
ssize_t ret; | ||||
if (n->is_active != NULL) | n->is_active = NULL; /* always free'd by its own continuation! */ | |||
{ | ||||
GNUNET_break (0); | /* fail messages currently in the queue */ | |||
return; /* transmission already pending */ | ||||
} | ||||
if (n->transmission_task != GNUNET_SCHEDULER_NO_TASK) | ||||
{ | ||||
GNUNET_break (0); | ||||
return; /* currently waiting for bandwidth */ | ||||
} | ||||
while (NULL != (mq = n->messages_head)) | while (NULL != (mq = n->messages_head)) | |||
{ | { | |||
timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout); | ||||
if (timeout.rel_value > 0) | ||||
break; | ||||
GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq); | GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq); | |||
n->is_active = mq; | if (NULL != mq->cont) | |||
mq->n = n; | mq->cont (mq->cont_cls, GNUNET_SYSERR); | |||
transmit_send_continuation (mq, &n->id, GNUNET_SYSERR); /* timeout | GNUNET_free (mq); | |||
*/ | ||||
} | } | |||
if (NULL == mq) | /* It is too late to send other peer disconnect notifications, but at | |||
return; /* no more messages */ | least internally we need to get clean... */ | |||
if (GNUNET_YES == test_connected (n)) | ||||
if (n->address == NULL) | { | |||
{ | GNUNET_STATISTICS_set (GST_stats, | |||
#if DEBUG_TRANSPORT | gettext_noop ("# peers connected"), | |||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n", | --neighbours_connected, | |||
GNUNET_i2s (&n->id)); | GNUNET_NO); | |||
#endif | disconnect_notify_cb (callback_cls, &n->id); | |||
GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq); | ||||
transmit_send_continuation (mq, &n->id, GNUNET_SYSERR); | ||||
GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK); | ||||
n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n) | ||||
; | ||||
return; | ||||
} | } | |||
if (GST_plugins_find (n->address->transport_name) == NULL) | /* FIXME-PLUGIN-API: This does not seem to guarantee that all | |||
{ | transport sessions eventually get killed due to inactivity; they | |||
GNUNET_break (0); | MUST have their own timeout logic (but at least TCP doesn't have | |||
return; | one yet). Are we sure that EVERY 'session' of a plugin is | |||
} | actually cleaned up this way!? Note that if we are switching | |||
GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq); | between two TCP sessions to the same peer, the existing plugin | |||
n->is_active = mq; | API gives us not even the means to selectively kill only one of | |||
mq->n = n; | them! Killing all sessions like this seems to be very, very | |||
wrong. */ | ||||
if ((GNUNET_NO == keep_sessions) && | ||||
(NULL != n->primary_address.address) && | ||||
(NULL != (papi = GST_plugins_find (n->primary_address.address->transp | ||||
ort_name)))) | ||||
papi->disconnect (papi->cls, &n->id); | ||||
if ((n->address->address_length == 0) && (n->session == NULL)) | n->state = S_DISCONNECT_FINISHED; | |||
{ | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "No address for peer `%s'\n", | GNUNET_assert (GNUNET_YES == | |||
GNUNET_i2s (&n->id)); | GNUNET_CONTAINER_multihashmap_remove (neighbours, | |||
transmit_send_continuation (mq, &n->id, GNUNET_SYSERR); | &n->id.hashPubKey, n | |||
GNUNET_assert (n->transmission_task == GNUNET_SCHEDULER_NO_TASK); | )); | |||
n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n) | ||||
; | ||||
return; | ||||
} | ||||
ret = send_with_session(n, | /* cut transport-level connection */ | |||
mq->message_buf, mq->message_buf_size, | free_address (&n->primary_address); | |||
0, timeout, | free_address (&n->alternative_address); | |||
&transmit_send_continuation, mq); | ||||
// FIXME-ATS-API: we might want to be more specific about | ||||
// which states we do this from in the future (ATS should | ||||
// have given us a 'suggest_address' handle, and if we have | ||||
// such a handle, we should cancel the operation here! | ||||
GNUNET_ATS_suggest_address_cancel (GST_ats, &n->id); | ||||
if (ret == -1) | if (GNUNET_SCHEDULER_NO_TASK != n->task) | |||
{ | { | |||
/* failure, but 'send' would not call continuation in this case, | GNUNET_SCHEDULER_cancel (n->task); | |||
* so we need to do it here! */ | n->task = GNUNET_SCHEDULER_NO_TASK; | |||
transmit_send_continuation (mq, &n->id, GNUNET_SYSERR); | ||||
} | } | |||
/* free rest of memory */ | ||||
GNUNET_free (n); | ||||
} | } | |||
/** | /** | |||
* Task invoked to start a transmission to another peer. | * Transmit a message using the current session of the given | |||
* neighbour. | ||||
* | * | |||
* @param cls the 'struct NeighbourMapEntry' | * @param n entry for the recipient | |||
* @param tc scheduler context | * @param msgbuf buffer to transmit | |||
* @param msgbuf_size number of bytes in buffer | ||||
* @param priority transmission priority | ||||
* @param timeout transmission timeout | ||||
* @param cont continuation to call when finished (can be NULL) | ||||
* @param cont_cls closure for cont | ||||
*/ | */ | |||
static void | static void | |||
transmission_task (void *cls, const struct GNUNET_SCHEDULER_TaskContext *tc | send_with_session (struct NeighbourMapEntry *n, | |||
) | const char *msgbuf, size_t msgbuf_size, | |||
uint32_t priority, | ||||
struct GNUNET_TIME_Relative timeout, | ||||
GNUNET_TRANSPORT_TransmitContinuation cont, | ||||
void *cont_cls) | ||||
{ | { | |||
struct NeighbourMapEntry *n = cls; | struct GNUNET_TRANSPORT_PluginFunctions *papi; | |||
GNUNET_assert (NULL != lookup_neighbour (&n->id)); | GNUNET_assert (n->primary_address.session != NULL); | |||
n->transmission_task = GNUNET_SCHEDULER_NO_TASK; | if ( ( (NULL == (papi = GST_plugins_find (n->primary_address.address->tra | |||
try_transmission_to_peer (n); | nsport_name))) || | |||
(-1 == papi->send (papi->cls, | ||||
n->primary_address.session, | ||||
msgbuf, msgbuf_size, | ||||
priority, | ||||
timeout, | ||||
cont, cont_cls))) && | ||||
(NULL != cont) ) | ||||
cont (cont_cls, &n->id, GNUNET_SYSERR); | ||||
GNUNET_break (NULL != papi); | ||||
} | } | |||
/** | /** | |||
* Initialize the neighbours subsystem. | * Master task run for every neighbour. Performs all of the time-related | |||
* activities (keep alive, send next message, disconnect if idle, finish | ||||
* clean up after disconnect). | ||||
* | * | |||
* @param cls closure for callbacks | * @param cls the 'struct NeighbourMapEntry' for which we are running | |||
* @param connect_cb function to call if we connect to a peer | * @param tc scheduler context (unused) | |||
* @param disconnect_cb function to call if we disconnect from a peer | ||||
* @param peer_address_cb function to call if we change an active address | ||||
* of a neighbour | ||||
*/ | */ | |||
void | static void | |||
GST_neighbours_start (void *cls, | master_task (void *cls, | |||
GNUNET_TRANSPORT_NotifyConnect connect_cb, | const struct GNUNET_SCHEDULER_TaskContext *tc); | |||
GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb, | ||||
GNUNET_TRANSPORT_PeerIterateCallback peer_address_cb) | ||||
{ | ||||
callback_cls = cls; | ||||
connect_notify_cb = connect_cb; | ||||
disconnect_notify_cb = disconnect_cb; | ||||
address_change_cb = peer_address_cb; | ||||
neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE); | ||||
} | ||||
/** | ||||
* Function called when the 'DISCONNECT' message has been sent by the | ||||
* plugin. Frees the neighbour --- if the entry still exists. | ||||
* | ||||
* @param cls NULL | ||||
* @param target identity of the neighbour that was disconnected | ||||
* @param result GNUNET_OK if the disconnect got out successfully | ||||
*/ | ||||
static void | static void | |||
send_disconnect_cont (void *cls, const struct GNUNET_PeerIdentity *target, | send_disconnect_cont (void *cls, const struct GNUNET_PeerIdentity *target, | |||
int result) | int result) | |||
{ | { | |||
#if DEBUG_TRANSPORT | struct NeighbourMapEntry *n; | |||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Sending DISCONNECT message to peer `%4s': %i\n", | n = lookup_neighbour (target); | |||
GNUNET_i2s (target), result); | if (NULL == n) | |||
#endif | return; /* already gone */ | |||
if (S_DISCONNECT != n->state) | ||||
return; /* have created a fresh entry since */ | ||||
n->state = S_DISCONNECT; | ||||
if (GNUNET_SCHEDULER_NO_TASK != n->task) | ||||
GNUNET_SCHEDULER_cancel (n->task); | ||||
n->task = GNUNET_SCHEDULER_add_now (&master_task, n); | ||||
} | } | |||
static int | /** | |||
send_disconnect (struct NeighbourMapEntry * n) | * Transmit a DISCONNECT message to the other peer. | |||
* | ||||
* @param n neighbour to send DISCONNECT message. | ||||
*/ | ||||
static void | ||||
send_disconnect (struct NeighbourMapEntry *n) | ||||
{ | { | |||
size_t ret; | ||||
struct SessionDisconnectMessage disconnect_msg; | struct SessionDisconnectMessage disconnect_msg; | |||
#if DEBUG_TRANSPORT | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | |||
"Sending DISCONNECT message to peer `%4s'\n", | "Sending DISCONNECT message to peer `%4s'\n", | |||
GNUNET_i2s (&n->id)); | GNUNET_i2s (&n->id)); | |||
#endif | ||||
disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessa ge)); | disconnect_msg.header.size = htons (sizeof (struct SessionDisconnectMessa ge)); | |||
disconnect_msg.header.type = | disconnect_msg.header.type = | |||
htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT); | htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT); | |||
disconnect_msg.reserved = htonl (0); | disconnect_msg.reserved = htonl (0); | |||
disconnect_msg.purpose.size = | disconnect_msg.purpose.size = | |||
htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) + | htonl (sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) + | |||
sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) + | sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) + | |||
sizeof (struct GNUNET_TIME_AbsoluteNBO)); | sizeof (struct GNUNET_TIME_AbsoluteNBO)); | |||
disconnect_msg.purpose.purpose = | disconnect_msg.purpose.purpose = | |||
htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT); | htonl (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT); | |||
disconnect_msg.timestamp = | disconnect_msg.timestamp = | |||
GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()); | GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()); | |||
disconnect_msg.public_key = GST_my_public_key; | disconnect_msg.public_key = GST_my_public_key; | |||
GNUNET_assert (GNUNET_OK == | GNUNET_assert (GNUNET_OK == | |||
GNUNET_CRYPTO_rsa_sign (GST_my_private_key, | GNUNET_CRYPTO_rsa_sign (GST_my_private_key, | |||
&disconnect_msg.purpose, | &disconnect_msg.purpose, | |||
&disconnect_msg.signature)); | &disconnect_msg.signature)); | |||
ret = send_with_session (n, | send_with_session (n, | |||
(const char *) &disconnect_msg, sizeof (disconnect_msg), | (const char *) &disconnect_msg, sizeof (disconnect_msg) | |||
UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, | , | |||
&send_disconnect_cont, NULL); | UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, | |||
&send_disconnect_cont, NULL); | ||||
if (ret == GNUNET_SYSERR) | ||||
return GNUNET_SYSERR; | ||||
GNUNET_STATISTICS_update (GST_stats, | GNUNET_STATISTICS_update (GST_stats, | |||
gettext_noop | gettext_noop | |||
("# peers disconnected due to external request" ), 1, | ("# DISCONNECT messages sent"), 1, | |||
GNUNET_NO); | GNUNET_NO); | |||
return GNUNET_OK; | ||||
} | } | |||
/** | /** | |||
* Disconnect from the given neighbour, clean up the record. | * Disconnect from the given neighbour, clean up the record. | |||
* | * | |||
* @param n neighbour to disconnect from | * @param n neighbour to disconnect from | |||
*/ | */ | |||
static void | static void | |||
disconnect_neighbour (struct NeighbourMapEntry *n) | disconnect_neighbour (struct NeighbourMapEntry *n) | |||
{ | { | |||
struct MessageQueue *mq; | /* depending on state, notify neighbour and/or upper layers of this peer | |||
int previous_state; | about disconnect */ | |||
switch (n->state) | ||||
previous_state = n->state; | ||||
if (is_disconnecting (n)) | ||||
return; | ||||
/* send DISCONNECT MESSAGE */ | ||||
if (previous_state == S_CONNECTED) | ||||
{ | ||||
if (GNUNET_OK == send_disconnect (n)) | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Sent DISCONNECT_MSG to `%s'\n", | ||||
GNUNET_i2s (&n->id)); | ||||
else | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Could not send DISCONNECT_MSG to `%s'\n", | ||||
GNUNET_i2s (&n->id)); | ||||
} | ||||
change_state (n, S_DISCONNECT); | ||||
if (previous_state == S_CONNECTED) | ||||
{ | ||||
GNUNET_assert (NULL != n->address); | ||||
if (n->address_state == USED) | ||||
{ | ||||
GST_validation_set_address_use (n->address, n->session, GNUNET_NO); | ||||
GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO | ||||
); | ||||
n->address_state = UNUSED; | ||||
} | ||||
} | ||||
if (n->address != NULL) | ||||
{ | ||||
struct GNUNET_TRANSPORT_PluginFunctions *papi; | ||||
papi = GST_plugins_find (n->address->transport_name); | ||||
if (papi != NULL) | ||||
papi->disconnect (papi->cls, &n->id); | ||||
} | ||||
while (NULL != (mq = n->messages_head)) | ||||
{ | ||||
GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq); | ||||
if (NULL != mq->cont) | ||||
mq->cont (mq->cont_cls, GNUNET_SYSERR); | ||||
GNUNET_free (mq); | ||||
} | ||||
if (NULL != n->is_active) | ||||
{ | ||||
n->is_active->n = NULL; | ||||
n->is_active = NULL; | ||||
} | ||||
switch (previous_state) | ||||
{ | { | |||
case S_NOT_CONNECTED: | ||||
case S_INIT_ATS: | ||||
case S_INIT_BLACKLIST: | ||||
/* other peer is completely unaware of us, no need to send DISCONNECT * | ||||
/ | ||||
n->state = S_DISCONNECT_FINISHED; | ||||
free_neighbour (n, GNUNET_NO); | ||||
return; | ||||
case S_CONNECT_SENT: | ||||
send_disconnect (n); | ||||
n->state = S_DISCONNECT; | ||||
break; | ||||
case S_CONNECT_RECV_ATS: | ||||
case S_CONNECT_RECV_BLACKLIST: | ||||
/* we never ACK'ed the other peer's request, no need to send DISCONNECT | ||||
*/ | ||||
n->state = S_DISCONNECT_FINISHED; | ||||
free_neighbour (n, GNUNET_NO); | ||||
return; | ||||
case S_CONNECT_RECV_ACK: | ||||
/* we DID ACK the other peer's request, must send DISCONNECT */ | ||||
send_disconnect (n); | ||||
n->state = S_DISCONNECT; | ||||
break; | ||||
case S_CONNECTED: | case S_CONNECTED: | |||
GNUNET_assert (neighbours_connected > 0); | case S_RECONNECT_BLACKLIST: | |||
neighbours_connected--; | case S_RECONNECT_SENT: | |||
GNUNET_assert (GNUNET_SCHEDULER_NO_TASK != n->keepalive_task); | case S_CONNECTED_SWITCHING_BLACKLIST: | |||
GNUNET_SCHEDULER_cancel (n->keepalive_task); | case S_CONNECTED_SWITCHING_CONNECT_SENT: | |||
n->keepalive_task = GNUNET_SCHEDULER_NO_TASK; | /* we are currently connected, need to send disconnect and do | |||
n->expect_latency_response = GNUNET_NO; | internal notifications and update statistics */ | |||
GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected") | send_disconnect (n); | |||
, -1, | GNUNET_STATISTICS_set (GST_stats, | |||
GNUNET_NO); | gettext_noop ("# peers connected"), | |||
--neighbours_connected, | ||||
GNUNET_NO); | ||||
disconnect_notify_cb (callback_cls, &n->id); | disconnect_notify_cb (callback_cls, &n->id); | |||
n->state = S_DISCONNECT; | ||||
break; | break; | |||
case S_FAST_RECONNECT: | case S_RECONNECT_ATS: | |||
GNUNET_STATISTICS_update (GST_stats, | /* ATS address request timeout, disconnect without sending disconnect m | |||
gettext_noop ("# fast reconnects failed"), 1, | essage */ | |||
GNUNET_NO); | GNUNET_STATISTICS_set (GST_stats, | |||
gettext_noop ("# peers connected"), | ||||
--neighbours_connected, | ||||
GNUNET_NO); | ||||
disconnect_notify_cb (callback_cls, &n->id); | disconnect_notify_cb (callback_cls, &n->id); | |||
n->state = S_DISCONNECT; | ||||
break; | ||||
case S_DISCONNECT: | ||||
/* already disconnected, ignore */ | ||||
break; | ||||
case S_DISCONNECT_FINISHED: | ||||
/* already cleaned up, how did we get here!? */ | ||||
GNUNET_assert (0); | ||||
break; | break; | |||
default: | default: | |||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unhandled state `%s' \n",print_st | ||||
ate (n->state)); | ||||
GNUNET_break (0); | ||||
break; | break; | |||
} | } | |||
/* schedule timeout to clean up */ | ||||
GNUNET_ATS_suggest_address_cancel (GST_ats, &n->id); | if (GNUNET_SCHEDULER_NO_TASK != n->task) | |||
GNUNET_SCHEDULER_cancel (n->task); | ||||
GNUNET_assert (GNUNET_YES == | n->task = GNUNET_SCHEDULER_add_delayed (DISCONNECT_SENT_TIMEOUT, | |||
GNUNET_CONTAINER_multihashmap_remove (neighbours, | &master_task, n); | |||
&n->id.hashPubKey, n | ||||
)); | ||||
if (GNUNET_SCHEDULER_NO_TASK != n->ats_suggest) | ||||
{ | ||||
GNUNET_SCHEDULER_cancel (n->ats_suggest); | ||||
n->ats_suggest = GNUNET_SCHEDULER_NO_TASK; | ||||
} | ||||
if (GNUNET_SCHEDULER_NO_TASK != n->timeout_task) | ||||
{ | ||||
GNUNET_SCHEDULER_cancel (n->timeout_task); | ||||
n->timeout_task = GNUNET_SCHEDULER_NO_TASK; | ||||
} | ||||
if (GNUNET_SCHEDULER_NO_TASK != n->transmission_task) | ||||
{ | ||||
GNUNET_SCHEDULER_cancel (n->transmission_task); | ||||
n->transmission_task = GNUNET_SCHEDULER_NO_TASK; | ||||
} | ||||
if (NULL != n->address) | ||||
{ | ||||
GNUNET_HELLO_address_free (n->address); | ||||
n->address = NULL; | ||||
} | ||||
n->session = NULL; | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Deleting peer `%4s', %X\n", | ||||
GNUNET_i2s (&n->id), n); | ||||
GNUNET_free (n); | ||||
} | } | |||
/** | /** | |||
* Peer has been idle for too long. Disconnect. | * We're done with our transmission attempt, continue processing. | |||
* | * | |||
* @param cls the 'struct NeighbourMapEntry' of the neighbour that went idl | * @param cls the 'struct MessageQueue' of the message | |||
e | * @param receiver intended receiver | |||
* @param tc scheduler context | * @param success whether it worked or not | |||
*/ | */ | |||
static void | static void | |||
neighbour_timeout_task (void *cls, | transmit_send_continuation (void *cls, | |||
const struct GNUNET_SCHEDULER_TaskContext *tc) | const struct GNUNET_PeerIdentity *receiver, | |||
int success) | ||||
{ | { | |||
struct NeighbourMapEntry *n = cls; | struct MessageQueue *mq = cls; | |||
struct NeighbourMapEntry *n; | ||||
n->timeout_task = GNUNET_SCHEDULER_NO_TASK; | ||||
GNUNET_STATISTICS_update (GST_stats, | if (NULL == (n = lookup_neighbour (receiver))) | |||
gettext_noop | { | |||
("# peers disconnected due to timeout"), 1, | GNUNET_free (mq); | |||
GNUNET_NO); | return; /* disconnect or other error while transmitting, can happen */ | |||
disconnect_neighbour (n); | } | |||
if (n->is_active == mq) | ||||
{ | ||||
/* this is still "our" neighbour, remove us from its queue | ||||
and allow it to send the next message now */ | ||||
n->is_active = NULL; | ||||
if (GNUNET_SCHEDULER_NO_TASK != n->task) | ||||
GNUNET_SCHEDULER_cancel (n->task); | ||||
n->task = GNUNET_SCHEDULER_add_now (&master_task, n); | ||||
} | ||||
GNUNET_assert (bytes_in_send_queue >= mq->message_buf_size); | ||||
bytes_in_send_queue -= mq->message_buf_size; | ||||
GNUNET_STATISTICS_set (GST_stats, | ||||
gettext_noop | ||||
("# bytes in message queue for other peers"), | ||||
bytes_in_send_queue, GNUNET_NO); | ||||
if (GNUNET_OK == success) | ||||
GNUNET_STATISTICS_update (GST_stats, | ||||
gettext_noop | ||||
("# messages transmitted to other peers"), | ||||
1, GNUNET_NO); | ||||
else | ||||
GNUNET_STATISTICS_update (GST_stats, | ||||
gettext_noop | ||||
("# transmission failures for messages to othe | ||||
r peers"), | ||||
1, GNUNET_NO); | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Sending message to `%s' of type %u was a %s\n", | ||||
GNUNET_i2s (receiver), | ||||
ntohs (((struct GNUNET_MessageHeader *) mq->message_buf)->typ | ||||
e), | ||||
(success == GNUNET_OK) ? "success" : "FAILURE"); | ||||
if (NULL != mq->cont) | ||||
mq->cont (mq->cont_cls, success); | ||||
GNUNET_free (mq); | ||||
} | } | |||
/** | /** | |||
* Send another keepalive message. | * Check the message list for the given neighbour and if we can | |||
* send a message, do so. This function should only be called | ||||
* if the connection is at least generally ready for transmission. | ||||
* While we will only send one message at a time, no bandwidth | ||||
* quota management is performed here. If a message was given to | ||||
* the plugin, the continuation will automatically re-schedule | ||||
* the 'master' task once the next message might be transmitted. | ||||
* | * | |||
* @param cls the 'struct NeighbourMapEntry' of the neighbour that went idl | * @param n target peer for which to transmit | |||
e | ||||
* @param tc scheduler context | ||||
*/ | */ | |||
static void | static void | |||
neighbour_keepalive_task (void *cls, | try_transmission_to_peer (struct NeighbourMapEntry *n) | |||
const struct GNUNET_SCHEDULER_TaskContext *tc) | ||||
{ | { | |||
struct NeighbourMapEntry *n = cls; | struct MessageQueue *mq; | |||
struct GNUNET_MessageHeader m; | struct GNUNET_TIME_Relative timeout; | |||
int ret; | ||||
GNUNET_assert (S_CONNECTED == n->state); | ||||
n->keepalive_task = | ||||
GNUNET_SCHEDULER_add_delayed (KEEPALIVE_FREQUENCY, | ||||
&neighbour_keepalive_task, n); | ||||
GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# keepalives sent"), | if (NULL == n->primary_address.address) | |||
1, | { | |||
GNUNET_NO); | /* no address, why are we here? */ | |||
m.size = htons (sizeof (struct GNUNET_MessageHeader)); | GNUNET_break (0); | |||
m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE); | return; | |||
} | ||||
ret = send_with_session (n, | if ((0 == n->primary_address.address->address_length) && | |||
(const void *) &m, sizeof (m), | (NULL == n->primary_address.session)) | |||
UINT32_MAX /* priority */ , | { | |||
GNUNET_TIME_UNIT_FOREVER_REL, | /* no address, why are we here? */ | |||
NULL, NULL); | GNUNET_break (0); | |||
return; | ||||
n->expect_latency_response = GNUNET_NO; | } | |||
n->keep_alive_sent = GNUNET_TIME_absolute_get_zero (); | if (NULL != n->is_active) | |||
if (ret != GNUNET_SYSERR) | ||||
{ | { | |||
n->expect_latency_response = GNUNET_YES; | /* transmission already pending */ | |||
n->keep_alive_sent = GNUNET_TIME_absolute_get (); | return; | |||
} | } | |||
/* timeout messages from the queue that are past their due date */ | ||||
while (NULL != (mq = n->messages_head)) | ||||
{ | ||||
timeout = GNUNET_TIME_absolute_get_remaining (mq->timeout); | ||||
if (timeout.rel_value > 0) | ||||
break; | ||||
GNUNET_STATISTICS_update (GST_stats, | ||||
gettext_noop | ||||
("# messages timed out while in transport queu | ||||
e"), | ||||
1, GNUNET_NO); | ||||
GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq); | ||||
n->is_active = mq; | ||||
transmit_send_continuation (mq, &n->id, GNUNET_SYSERR); /* timeout | ||||
*/ | ||||
} | ||||
if (NULL == mq) | ||||
return; /* no more messages */ | ||||
GNUNET_CONTAINER_DLL_remove (n->messages_head, n->messages_tail, mq); | ||||
n->is_active = mq; | ||||
send_with_session (n, | ||||
mq->message_buf, mq->message_buf_size, | ||||
0 /* priority */, timeout, | ||||
&transmit_send_continuation, mq); | ||||
} | } | |||
/** | /** | |||
* Disconnect from the given neighbour. | * Send keepalive message to the neighbour. Must only be called | |||
* if we are on 'connected' state. Will internally determine | ||||
* if a keepalive is truly needed (so can always be called). | ||||
* | * | |||
* @param cls unused | * @param n neighbour that went idle and needs a keepalive | |||
* @param key hash of neighbour's public key (not used) | ||||
* @param value the 'struct NeighbourMapEntry' of the neighbour | ||||
*/ | */ | |||
static int | ||||
disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *va | ||||
lue) | ||||
{ | ||||
struct NeighbourMapEntry *n = value; | ||||
#if DEBUG_TRANSPORT | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s', %s\n", | ||||
GNUNET_i2s (&n->id), "SHUTDOWN_TASK"); | ||||
#endif | ||||
if (S_CONNECTED == n->state) | ||||
GNUNET_STATISTICS_update (GST_stats, | ||||
gettext_noop | ||||
("# peers disconnected due to global disconne | ||||
ct"), | ||||
1, GNUNET_NO); | ||||
disconnect_neighbour (n); | ||||
return GNUNET_OK; | ||||
} | ||||
static void | static void | |||
ats_suggest_cancel (void *cls, const struct GNUNET_SCHEDULER_TaskContext *t c) | send_keepalive (struct NeighbourMapEntry *n) | |||
{ | { | |||
struct NeighbourMapEntry *n = cls; | struct GNUNET_MessageHeader m; | |||
n->ats_suggest = GNUNET_SCHEDULER_NO_TASK; | ||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||||
"ATS did not suggested address to connect to peer `%s'\n", | ||||
GNUNET_i2s (&n->id)); | ||||
disconnect_neighbour (n); | GNUNET_assert (S_CONNECTED == n->state); | |||
if (GNUNET_TIME_absolute_get_remaining (n->keep_alive_time).rel_value > 0 | ||||
) | ||||
return; /* no keepalive needed at this time */ | ||||
m.size = htons (sizeof (struct GNUNET_MessageHeader)); | ||||
m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE); | ||||
send_with_session (n, | ||||
(const void *) &m, sizeof (m), | ||||
UINT32_MAX /* priority */, | ||||
KEEPALIVE_FREQUENCY, | ||||
NULL, NULL); | ||||
GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# keepalives sent"), | ||||
1, | ||||
GNUNET_NO); | ||||
n->expect_latency_response = GNUNET_YES; | ||||
n->last_keep_alive_time = GNUNET_TIME_absolute_get (); | ||||
n->keep_alive_time = GNUNET_TIME_relative_to_absolute (KEEPALIVE_FREQUENC | ||||
Y); | ||||
} | } | |||
/** | /** | |||
* Cleanup the neighbours subsystem. | * Keep the connection to the given neighbour alive longer, | |||
* we received a KEEPALIVE (or equivalent); send a response. | ||||
* | ||||
* @param neighbour neighbour to keep alive (by sending keep alive response | ||||
) | ||||
*/ | */ | |||
void | void | |||
GST_neighbours_stop () | GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour) | |||
{ | { | |||
// This can happen during shutdown | struct NeighbourMapEntry *n; | |||
if (neighbours == NULL) | struct GNUNET_MessageHeader m; | |||
if (NULL == (n = lookup_neighbour (neighbour))) | ||||
{ | ||||
GNUNET_STATISTICS_update (GST_stats, | ||||
gettext_noop | ||||
("# KEEPALIVE messages discarded (peer unknow | ||||
n)"), | ||||
1, GNUNET_NO); | ||||
return; | ||||
} | ||||
if (NULL == n->primary_address.session) | ||||
{ | { | |||
GNUNET_STATISTICS_update (GST_stats, | ||||
gettext_noop | ||||
("# KEEPALIVE messages discarded (no session) | ||||
"), | ||||
1, GNUNET_NO); | ||||
return; | return; | |||
} | } | |||
/* send reply to allow neighbour to measure latency */ | ||||
GNUNET_CONTAINER_multihashmap_iterate (neighbours, &disconnect_all_neighb | m.size = htons (sizeof (struct GNUNET_MessageHeader)); | |||
ours, | m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE_RESPONSE) | |||
NULL); | ; | |||
GNUNET_CONTAINER_multihashmap_destroy (neighbours); | send_with_session(n, | |||
// GNUNET_assert (neighbours_connected == 0); | (const void *) &m, sizeof (m), | |||
neighbours = NULL; | UINT32_MAX /* priority */, | |||
callback_cls = NULL; | KEEPALIVE_FREQUENCY, | |||
connect_notify_cb = NULL; | NULL, NULL); | |||
disconnect_notify_cb = NULL; | ||||
address_change_cb = NULL; | ||||
} | ||||
struct ContinutionContext | ||||
{ | ||||
struct GNUNET_HELLO_Address *address; | ||||
struct Session *session; | ||||
}; | ||||
static void | ||||
send_outbound_quota (const struct GNUNET_PeerIdentity *target, | ||||
struct GNUNET_BANDWIDTH_Value32NBO quota) | ||||
{ | ||||
struct QuotaSetMessage q_msg; | ||||
#if DEBUG_TRANSPORT | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Sending outbound quota of %u Bps for peer `%s' to all client | ||||
s\n", | ||||
ntohl (quota.value__), GNUNET_i2s (target)); | ||||
#endif | ||||
q_msg.header.size = htons (sizeof (struct QuotaSetMessage)); | ||||
q_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SET_QUOTA); | ||||
q_msg.quota = quota; | ||||
q_msg.peer = (*target); | ||||
GST_clients_broadcast (&q_msg.header, GNUNET_NO); | ||||
} | } | |||
/** | /** | |||
* We tried to send a SESSION_CONNECT message to another peer. If this | * We received a KEEP_ALIVE_RESPONSE message and use this to calculate | |||
* succeeded, we change the state. If it failed, we should tell | * latency to this peer. Pass the updated information (existing ats | |||
* ATS to not use this address anymore (until it is re-validated). | * plus calculated latency) to ATS. | |||
* | * | |||
* @param cls the 'struct GNUNET_HELLO_Address' of the address that was tri | * @param neighbour neighbour to keep alive | |||
ed | * @param ats performance data | |||
* @param target peer to send the message to | * @param ats_count number of entries in ats | |||
* @param success GNUNET_OK on success | ||||
*/ | */ | |||
static void | void | |||
send_connect_continuation (void *cls, const struct GNUNET_PeerIdentity *tar | GST_neighbours_keepalive_response (const struct GNUNET_PeerIdentity *neighb | |||
get, | our, | |||
int success) | const struct GNUNET_ATS_Information *ats | |||
, | ||||
uint32_t ats_count) | ||||
{ | { | |||
struct ContinutionContext *cc = cls; | struct NeighbourMapEntry *n; | |||
struct NeighbourMapEntry *n = lookup_neighbour (&cc->address->peer); | uint32_t latency; | |||
struct GNUNET_ATS_Information ats_new[ats_count + 1]; | ||||
if (GNUNET_YES != success) | if (NULL == (n = lookup_neighbour (neighbour))) | |||
{ | ||||
GNUNET_assert (strlen (cc->address->transport_name) > 0); | ||||
GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session); | ||||
} | ||||
if ((NULL == neighbours) || (NULL == n) || (n->state == S_DISCONNECT)) | ||||
{ | { | |||
GNUNET_HELLO_address_free (cc->address); | GNUNET_STATISTICS_update (GST_stats, | |||
GNUNET_free (cc); | gettext_noop | |||
("# KEEPALIVE_RESPONSE messages discarded (no | ||||
t connected)"), | ||||
1, GNUNET_NO); | ||||
return; | return; | |||
} | } | |||
if ( (S_CONNECTED != n->state) || | ||||
if ((GNUNET_YES == success) && | (GNUNET_YES != n->expect_latency_response) ) | |||
((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT))) | ||||
{ | { | |||
change_state (n, S_CONNECT_SENT); | GNUNET_STATISTICS_update (GST_stats, | |||
GNUNET_HELLO_address_free (cc->address); | gettext_noop | |||
GNUNET_free (cc); | ("# KEEPALIVE_RESPONSE messages discarded (no | |||
t expected)"), | ||||
1, GNUNET_NO); | ||||
return; | return; | |||
} | } | |||
n->expect_latency_response = GNUNET_NO; | ||||
if ((GNUNET_NO == success) && | n->latency = GNUNET_TIME_absolute_get_duration (n->last_keep_alive_time); | |||
((n->state == S_NOT_CONNECTED) || (n->state == S_CONNECT_SENT))) | n->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONN | |||
{ | ECTION_TIMEOUT); | |||
#if DEBUG_TRANSPORT | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | |||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | "Latency for peer `%s' is %llu ms\n", | |||
"Failed to send CONNECT_MSG to peer `%4s' with address '%s' | GNUNET_i2s (&n->id), n->latency.rel_value); | |||
session %p, asking ATS for new address \n", | memcpy (ats_new, ats, sizeof (struct GNUNET_ATS_Information) * ats_count) | |||
GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->sessi | ; | |||
on); | /* append latency */ | |||
#endif | ats_new[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY); | |||
change_state (n, S_NOT_CONNECTED); | if (n->latency.rel_value > UINT32_MAX) | |||
if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK) | latency = UINT32_MAX; | |||
GNUNET_SCHEDULER_cancel (n->ats_suggest); | else | |||
n->ats_suggest = | latency = n->latency.rel_value; | |||
GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, &ats_suggest_ca | ats_new[ats_count].value = htonl (latency); | |||
ncel, | GNUNET_ATS_address_update (GST_ats, | |||
n); | n->primary_address.address, | |||
GNUNET_ATS_suggest_address (GST_ats, &n->id); | n->primary_address.session, ats_new, | |||
} | ats_count + 1); | |||
GNUNET_HELLO_address_free (cc->address); | ||||
GNUNET_free (cc); | ||||
} | } | |||
/** | /** | |||
* We tried to switch addresses with an peer already connected. If it faile | * We have received a message from the given sender. How long should | |||
d, | * we delay before receiving more? (Also used to keep the peer marked | |||
* we should tell ATS to not use this address anymore (until it is re-valid | * as live). | |||
ated). | ||||
* | * | |||
* @param cls the 'struct NeighbourMapEntry' | * @param sender sender of the message | |||
* @param target peer to send the message to | * @param size size of the message | |||
* @param success GNUNET_OK on success | * @param do_forward set to GNUNET_YES if the message should be forwarded t | |||
o clients | ||||
* GNUNET_NO if the neighbour is not connected or violate | ||||
s the quota, | ||||
* GNUNET_SYSERR if the connection is not fully up yet | ||||
* @return how long to wait before reading more from this sender | ||||
*/ | */ | |||
static void | struct GNUNET_TIME_Relative | |||
send_switch_address_continuation (void *cls, | GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity | |||
const struct GNUNET_PeerIdentity *target, | *sender, ssize_t size, int *do_forw | |||
int success) | ard) | |||
{ | { | |||
struct ContinutionContext *cc = cls; | ||||
struct NeighbourMapEntry *n; | struct NeighbourMapEntry *n; | |||
struct GNUNET_TIME_Relative ret; | ||||
if (neighbours == NULL) | if (NULL == neighbours) | |||
{ | { | |||
GNUNET_HELLO_address_free (cc->address); | *do_forward = GNUNET_NO; | |||
GNUNET_free (cc); | return GNUNET_TIME_UNIT_FOREVER_REL; /* This can happen during shutdown | |||
return; /* neighbour is going away */ | */ | |||
} | } | |||
if (NULL == (n = lookup_neighbour (sender))) | ||||
n = lookup_neighbour (&cc->address->peer); | { | |||
if ((n == NULL) || (is_disconnecting (n))) | GST_neighbours_try_connect (sender); | |||
if (NULL == (n = lookup_neighbour (sender))) | ||||
{ | ||||
GNUNET_STATISTICS_update (GST_stats, | ||||
gettext_noop | ||||
("# messages discarded due to lack of neigh | ||||
bour record"), | ||||
1, GNUNET_NO); | ||||
*do_forward = GNUNET_NO; | ||||
return GNUNET_TIME_UNIT_ZERO; | ||||
} | ||||
} | ||||
if (! test_connected (n)) | ||||
{ | { | |||
GNUNET_HELLO_address_free (cc->address); | *do_forward = GNUNET_SYSERR; | |||
GNUNET_free (cc); | return GNUNET_TIME_UNIT_ZERO; | |||
return; /* neighbour is going away */ | ||||
} | } | |||
if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size) | ||||
GNUNET_assert ((n->state == S_CONNECTED) || (n->state == S_FAST_RECONNECT | ) | |||
)); | ||||
if (GNUNET_YES != success) | ||||
{ | { | |||
#if DEBUG_TRANSPORT | n->quota_violation_count++; | |||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | |||
"Failed to switch connected peer `%s' to address '%s' sessi | "Bandwidth quota (%u b/s) violation detected (total of %u). | |||
on %X, asking ATS for new address \n", | \n", | |||
GNUNET_i2s (&n->id), GST_plugins_a2s (cc->address), cc->ses | n->in_tracker.available_bytes_per_s__, | |||
sion); | n->quota_violation_count); | |||
#endif | /* Discount 32k per violation */ | |||
GNUNET_assert (strlen (cc->address->transport_name) > 0); | GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024); | |||
GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session); | ||||
if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK) | ||||
GNUNET_SCHEDULER_cancel (n->ats_suggest); | ||||
n->ats_suggest = | ||||
GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_can | ||||
cel, | ||||
n); | ||||
GNUNET_ATS_suggest_address (GST_ats, &n->id); | ||||
GNUNET_HELLO_address_free (cc->address); | ||||
GNUNET_free (cc); | ||||
return; | ||||
} | } | |||
/* Tell ATS that switching addresses was successful */ | else | |||
switch (n->state) | ||||
{ | { | |||
case S_CONNECTED: | if (n->quota_violation_count > 0) | |||
if (n->address_state == FRESH) | ||||
{ | { | |||
GST_validation_set_address_use (cc->address, cc->session, GNUNET_YES) | /* try to add 32k back */ | |||
; | GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024); | |||
GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0 | n->quota_violation_count--; | |||
); | ||||
if (cc->session != n->session) | ||||
GNUNET_break (0); | ||||
GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_ | ||||
YES); | ||||
n->address_state = USED; | ||||
} | } | |||
break; | } | |||
case S_FAST_RECONNECT: | if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD) | |||
#if DEBUG_TRANSPORT | { | |||
GNUNET_STATISTICS_update (GST_stats, | ||||
gettext_noop | ||||
("# bandwidth quota violations by other peers | ||||
"), | ||||
1, GNUNET_NO); | ||||
*do_forward = GNUNET_NO; | ||||
return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT; | ||||
} | ||||
*do_forward = GNUNET_YES; | ||||
ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 32 * 1024); | ||||
if (ret.rel_value > 0) | ||||
{ | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | |||
"Successful fast reconnect to peer `%s'\n", | "Throttling read (%llu bytes excess at %u b/s), waiting %ll | |||
GNUNET_i2s (&n->id)); | u ms before reading more.\n", | |||
#endif | (unsigned long long) n->in_tracker. | |||
change_state (n, S_CONNECTED); | consumption_since_last_update__, | |||
neighbours_connected++; | (unsigned int) n->in_tracker.available_bytes_per_s__, | |||
GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected") | (unsigned long long) ret.rel_value); | |||
, 1, | GNUNET_STATISTICS_update (GST_stats, | |||
GNUNET_NO); | gettext_noop ("# ms throttling suggested"), | |||
(int64_t) ret.rel_value, GNUNET_NO); | ||||
if (n->address_state == FRESH) | ||||
{ | ||||
GST_validation_set_address_use (cc->address, cc->session, GNUNET_YES) | ||||
; | ||||
GNUNET_ATS_address_update (GST_ats, cc->address, cc->session, NULL, 0 | ||||
); | ||||
GNUNET_ATS_address_in_use (GST_ats, cc->address, cc->session, GNUNET_ | ||||
YES); | ||||
n->address_state = USED; | ||||
} | ||||
if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK) | ||||
n->keepalive_task = | ||||
GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task, n); | ||||
/* Updating quotas */ | ||||
GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in); | ||||
send_outbound_quota (target, n->bandwidth_out); | ||||
default: | ||||
break; | ||||
} | } | |||
GNUNET_HELLO_address_free (cc->address); | return ret; | |||
GNUNET_free (cc); | ||||
} | } | |||
/** | /** | |||
* We tried to send a SESSION_CONNECT message to another peer. If this | * Transmit a message to the given target using the active connection. | |||
* succeeded, we change the state. If it failed, we should tell | ||||
* ATS to not use this address anymore (until it is re-validated). | ||||
* | * | |||
* @param cls the 'struct NeighbourMapEntry' | * @param target destination | |||
* @param target peer to send the message to | * @param msg message to send | |||
* @param success GNUNET_OK on success | * @param msg_size number of bytes in msg | |||
* @param timeout when to fail with timeout | ||||
* @param cont function to call when done | ||||
* @param cont_cls closure for 'cont' | ||||
*/ | */ | |||
static void | void | |||
send_connect_ack_continuation (void *cls, | GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void * | |||
const struct GNUNET_PeerIdentity *target, | msg, | |||
int success) | size_t msg_size, struct GNUNET_TIME_Relative timeout, | |||
GST_NeighbourSendContinuation cont, void *cont_cls) | ||||
{ | { | |||
struct ContinutionContext *cc = cls; | ||||
struct NeighbourMapEntry *n; | struct NeighbourMapEntry *n; | |||
struct MessageQueue *mq; | ||||
if (neighbours == NULL) | /* All ove these cases should never happen; they are all API violations. | |||
{ | But we check anyway, just to be sure. */ | |||
GNUNET_HELLO_address_free (cc->address); | if (NULL == (n = lookup_neighbour (target))) | |||
GNUNET_free (cc); | ||||
return; /* neighbour is going away */ | ||||
} | ||||
n = lookup_neighbour (&cc->address->peer); | ||||
if ((n == NULL) || (is_disconnecting (n))) | ||||
{ | { | |||
GNUNET_HELLO_address_free (cc->address); | GNUNET_break (0); | |||
GNUNET_free (cc); | if (NULL != cont) | |||
return; /* neighbour is going away */ | cont (cont_cls, GNUNET_SYSERR); | |||
return; | ||||
} | } | |||
if (GNUNET_YES != test_connected (n)) | ||||
if (GNUNET_YES == success) | ||||
{ | { | |||
GNUNET_HELLO_address_free (cc->address); | GNUNET_break (0); | |||
GNUNET_free (cc); | if (NULL != cont) | |||
return; /* sending successful */ | cont (cont_cls, GNUNET_SYSERR); | |||
return; | ||||
} | } | |||
bytes_in_send_queue += msg_size; | ||||
/* sending failed, ask for next address */ | GNUNET_STATISTICS_set (GST_stats, | |||
#if DEBUG_TRANSPORT | gettext_noop | |||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ("# bytes in message queue for other peers"), | |||
"Failed to send CONNECT_MSG to peer `%4s' with address '%s' s | bytes_in_send_queue, GNUNET_NO); | |||
ession %X, asking ATS for new address \n", | mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size); | |||
GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session | mq->cont = cont; | |||
); | mq->cont_cls = cont_cls; | |||
#endif | memcpy (&mq[1], msg, msg_size); | |||
change_state (n, S_NOT_CONNECTED); | mq->message_buf = (const char *) &mq[1]; | |||
GNUNET_assert (strlen (cc->address->transport_name) > 0); | mq->message_buf_size = msg_size; | |||
GNUNET_ATS_address_destroyed (GST_ats, cc->address, cc->session); | mq->timeout = GNUNET_TIME_relative_to_absolute (timeout); | |||
GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq) | ||||
if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK) | ; | |||
GNUNET_SCHEDULER_cancel (n->ats_suggest); | if ( (NULL != n->is_active) || | |||
n->ats_suggest = | ( (NULL == n->primary_address.session) && (NULL == n->primary_addres | |||
GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cance | s.address)) ) | |||
l, | return; | |||
n); | if (GNUNET_SCHEDULER_NO_TASK != n->task) | |||
GNUNET_ATS_suggest_address (GST_ats, &n->id); | GNUNET_SCHEDULER_cancel (n->task); | |||
GNUNET_HELLO_address_free (cc->address); | n->task = GNUNET_SCHEDULER_add_now (&master_task, n); | |||
GNUNET_free (cc); | ||||
} | } | |||
/** | /** | |||
* For an existing neighbour record, set the active connection to | * Send a SESSION_CONNECT message via the given address. | |||
* use the given address. | ||||
* | * | |||
* @param peer identity of the peer to switch the address for | * @param na address to use | |||
* @param address address of the other peer, NULL if other peer | ||||
* connected to us | ||||
* @param session session to use (or NULL) | ||||
* @param ats performance data | ||||
* @param ats_count number of entries in ats | ||||
* @param bandwidth_in inbound quota to be used when connection is up | ||||
* @param bandwidth_out outbound quota to be used when connection is up | ||||
* @return GNUNET_YES if we are currently connected, GNUNET_NO if the | ||||
* connection is not up (yet) | ||||
*/ | */ | |||
int | static void | |||
GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer, | send_session_connect (struct NeighbourAddress *na) | |||
const struct GNUNET_HELLO_Address | ||||
*address, | ||||
struct Session *session, | ||||
const struct GNUNET_ATS_Information | ||||
*ats, | ||||
uint32_t ats_count, | ||||
struct GNUNET_BANDWIDTH_Value32NBO | ||||
bandwidth_in, | ||||
struct GNUNET_BANDWIDTH_Value32NBO | ||||
bandwidth_out) | ||||
{ | { | |||
struct NeighbourMapEntry *n; | struct GNUNET_TRANSPORT_PluginFunctions *papi; | |||
struct SessionConnectMessage connect_msg; | struct SessionConnectMessage connect_msg; | |||
struct ContinutionContext *cc; | ||||
size_t msg_len; | ||||
size_t ret; | ||||
if (neighbours == NULL) | ||||
{ | ||||
/* This can happen during shutdown */ | ||||
return GNUNET_NO; | ||||
} | ||||
n = lookup_neighbour (peer); | ||||
if (NULL == n) | ||||
return GNUNET_NO; | ||||
if (n->state == S_DISCONNECT) | ||||
{ | ||||
/* We are disconnecting, nothing to do here */ | ||||
return GNUNET_NO; | ||||
} | ||||
GNUNET_assert (address->transport_name != NULL); | ||||
if ((session == NULL) && (0 == address->address_length)) | ||||
{ | ||||
GNUNET_break_op (0); | ||||
/* FIXME: is this actually possible? When does this happen? */ | ||||
if (strlen (address->transport_name) > 0) | ||||
GNUNET_ATS_address_destroyed (GST_ats, address, session); | ||||
GNUNET_ATS_suggest_address (GST_ats, peer); | ||||
return GNUNET_NO; | ||||
} | ||||
/* checks successful and neighbour != NULL */ | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"ATS tells us to switch to address '%s' session %p for peer ` | ||||
%s' in state `%s'\n", | ||||
(address->address_length != 0) ? GST_plugins_a2s (address): " | ||||
<inbound>", | ||||
session, | ||||
GNUNET_i2s (peer), | ||||
print_state (n->state)); | ||||
if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK) | if (NULL == (papi = GST_plugins_find (na->address->transport_name))) | |||
{ | ||||
GNUNET_SCHEDULER_cancel (n->ats_suggest); | ||||
n->ats_suggest = GNUNET_SCHEDULER_NO_TASK; | ||||
} | ||||
/* do not switch addresses just update quotas */ | ||||
/* | ||||
if (n->state == S_FAST_RECONNECT) | ||||
{ | { | |||
if (0 == GNUNET_HELLO_address_cmp(address, n->address)) | GNUNET_break (0); | |||
{ | return; | |||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||||
"FAST RECONNECT to peer `%s' and address '%s' with ident | ||||
ical ADDRESS\n", | ||||
GNUNET_i2s (&n->id), GST_plugins_a2s (n->address)); | ||||
} | ||||
} | ||||
*/ | ||||
if ((n->state == S_CONNECTED) && (NULL != n->address) && | ||||
(0 == GNUNET_HELLO_address_cmp (address, n->address)) && | ||||
(n->session == session)) | ||||
{ | ||||
n->bandwidth_in = bandwidth_in; | ||||
n->bandwidth_out = bandwidth_out; | ||||
GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in); | ||||
send_outbound_quota (peer, n->bandwidth_out); | ||||
return GNUNET_NO; | ||||
} | } | |||
if (n->state == S_CONNECTED) | if (NULL == na->session) | |||
na->session = papi->get_session (papi->cls, na->address); | ||||
if (NULL == na->session) | ||||
{ | { | |||
/* mark old address as no longer used */ | GNUNET_break (0); | |||
GNUNET_assert (NULL != n->address); | return; | |||
if (n->address_state == USED) | ||||
{ | ||||
GST_validation_set_address_use (n->address, n->session, GNUNET_NO); | ||||
GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO | ||||
); | ||||
n->address_state = UNUSED; | ||||
} | ||||
} | } | |||
na->connect_timestamp = GNUNET_TIME_absolute_get (); | ||||
connect_msg.header.size = htons (sizeof (struct SessionConnectMessage)); | ||||
connect_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CO | ||||
NNECT); | ||||
connect_msg.reserved = htonl (0); | ||||
connect_msg.timestamp = GNUNET_TIME_absolute_hton (na->connect_timestamp) | ||||
; | ||||
(void) papi->send (papi->cls, | ||||
na->session, | ||||
(const char *) &connect_msg, sizeof (struct SessionConn | ||||
ectMessage), | ||||
UINT_MAX, | ||||
GNUNET_TIME_UNIT_FOREVER_REL, | ||||
NULL, NULL); | ||||
} | ||||
/* set new address */ | /** | |||
if (NULL != n->address) | * Send a SESSION_CONNECT_ACK message via the given address. | |||
GNUNET_HELLO_address_free (n->address); | * | |||
n->address = GNUNET_HELLO_address_copy (address); | * @param address address to use | |||
n->address_state = FRESH; | * @param session session to use | |||
n->bandwidth_in = bandwidth_in; | * @param timestamp timestamp to use for the ACK message | |||
n->bandwidth_out = bandwidth_out; | */ | |||
GNUNET_SCHEDULER_cancel (n->timeout_task); | static void | |||
n->timeout_task = | send_session_connect_ack_message (const struct GNUNET_HELLO_Address *addres | |||
GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOU | s, | |||
T, | struct Session *session, | |||
&neighbour_timeout_task, n); | struct GNUNET_TIME_Absolute timestamp) | |||
{ | ||||
if (NULL != address_change_cb && n->state == S_CONNECTED) | ||||
address_change_cb (callback_cls, &n->id, n->address); | ||||
/* Obtain an session for this address from plugin */ | ||||
struct GNUNET_TRANSPORT_PluginFunctions *papi; | struct GNUNET_TRANSPORT_PluginFunctions *papi; | |||
papi = GST_plugins_find (address->transport_name); | struct SessionConnectMessage connect_msg; | |||
if (papi == NULL) | ||||
{ | ||||
/* we don't have the plugin for this address */ | ||||
GNUNET_ATS_address_destroyed (GST_ats, n->address, NULL); | ||||
if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK) | ||||
GNUNET_SCHEDULER_cancel (n->ats_suggest); | ||||
n->ats_suggest = GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, | ||||
ats_suggest_cancel, | ||||
n); | ||||
GNUNET_ATS_suggest_address (GST_ats, &n->id); | ||||
GNUNET_HELLO_address_free (n->address); | ||||
n->address = NULL; | ||||
n->session = NULL; | ||||
return GNUNET_NO; | ||||
} | ||||
if (session == NULL) | ||||
{ | ||||
n->session = papi->get_session (papi->cls, address); | ||||
/* Session could not be initiated */ | ||||
if (n->session == NULL) | ||||
{ | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Failed to obtain new session %p for peer `%s' and addre | ||||
ss '%s'\n", | ||||
n->session, GNUNET_i2s (&n->id), GST_plugins_a2s (n->addr | ||||
ess)); | ||||
GNUNET_ATS_address_destroyed (GST_ats, n->address, NULL); | ||||
if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK) | ||||
GNUNET_SCHEDULER_cancel (n->ats_suggest); | ||||
n->ats_suggest = GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, | ||||
ats_suggest_cancel, | ||||
n); | ||||
GNUNET_ATS_suggest_address (GST_ats, &n->id); | ||||
GNUNET_HELLO_address_free (n->address); | ||||
n->address = NULL; | ||||
n->session = NULL; | ||||
return GNUNET_NO; | ||||
} | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | if (NULL == (papi = GST_plugins_find (address->transport_name))) | |||
"Obtained new session %p for peer `%s' and address '%s'\n" | ||||
, | ||||
n->session, GNUNET_i2s (&n->id), GST_plugins_a2s (n->addre | ||||
ss)); | ||||
/* Telling ATS about new session */ | ||||
GNUNET_ATS_address_update (GST_ats, n->address, n->session, NULL, 0); | ||||
} | ||||
else | ||||
{ | { | |||
n->session = session; | GNUNET_break (0); | |||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | return; | |||
"Using existing session %p for peer `%s' and address '%s'\ | ||||
n", | ||||
n->session, | ||||
GNUNET_i2s (&n->id), | ||||
(address->address_length != 0) ? GST_plugins_a2s (address): | ||||
"<inbound>"); | ||||
} | } | |||
if (NULL == session) | ||||
switch (n->state) | session = papi->get_session (papi->cls, address); | |||
if (NULL == session) | ||||
{ | { | |||
case S_NOT_CONNECTED: | GNUNET_break (0); | |||
case S_CONNECT_SENT: | return; | |||
msg_len = sizeof (struct SessionConnectMessage); | ||||
connect_msg.header.size = htons (msg_len); | ||||
connect_msg.header.type = | ||||
htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT); | ||||
connect_msg.reserved = htonl (0); | ||||
connect_msg.timestamp = | ||||
GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()); | ||||
cc = GNUNET_malloc (sizeof (struct ContinutionContext)); | ||||
cc->session = n->session; | ||||
cc->address = GNUNET_HELLO_address_copy (address); | ||||
ret = send_with_session (n, | ||||
(const char *) &connect_msg, msg_len, | ||||
UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, | ||||
&send_connect_continuation, cc); | ||||
return GNUNET_NO; | ||||
case S_CONNECT_RECV: | ||||
/* We received a CONNECT message and asked ATS for an address */ | ||||
msg_len = sizeof (struct SessionConnectMessage); | ||||
connect_msg.header.size = htons (msg_len); | ||||
connect_msg.header.type = | ||||
htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT_ACK); | ||||
connect_msg.reserved = htonl (0); | ||||
connect_msg.timestamp = | ||||
GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()); | ||||
cc = GNUNET_malloc (sizeof (struct ContinutionContext)); | ||||
cc->session = n->session; | ||||
cc->address = GNUNET_HELLO_address_copy (address); | ||||
ret = send_with_session(n, | ||||
(const void *) &connect_msg, msg_len, | ||||
UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, | ||||
&send_connect_ack_continuation, | ||||
cc); | ||||
return GNUNET_NO; | ||||
case S_CONNECTED: | ||||
case S_FAST_RECONNECT: | ||||
/* connected peer is switching addresses or tries fast reconnect */ | ||||
msg_len = sizeof (struct SessionConnectMessage); | ||||
connect_msg.header.size = htons (msg_len); | ||||
connect_msg.header.type = | ||||
htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CONNECT); | ||||
connect_msg.reserved = htonl (0); | ||||
connect_msg.timestamp = | ||||
GNUNET_TIME_absolute_hton (GNUNET_TIME_absolute_get ()); | ||||
cc = GNUNET_malloc (sizeof (struct ContinutionContext)); | ||||
cc->session = n->session; | ||||
cc->address = GNUNET_HELLO_address_copy (address); | ||||
ret = send_with_session(n, | ||||
(const void *) &connect_msg, msg_len, | ||||
UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, | ||||
&send_switch_address_continuation, cc); | ||||
if (ret == GNUNET_SYSERR) | ||||
{ | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Failed to send CONNECT_MESSAGE to `%4s' using address '% | ||||
s' session %X\n", | ||||
GNUNET_i2s (peer), GST_plugins_a2s (address), session); | ||||
} | ||||
return GNUNET_NO; | ||||
default: | ||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, | ||||
"Invalid connection state to switch addresses %u \n", n->st | ||||
ate); | ||||
GNUNET_break_op (0); | ||||
return GNUNET_NO; | ||||
} | } | |||
connect_msg.header.size = htons (sizeof (struct SessionConnectMessage)); | ||||
connect_msg.header.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_CO | ||||
NNECT_ACK); | ||||
connect_msg.reserved = htonl (0); | ||||
connect_msg.timestamp = GNUNET_TIME_absolute_hton (timestamp); | ||||
(void) papi->send (papi->cls, | ||||
session, | ||||
(const char *) &connect_msg, sizeof (struct SessionConn | ||||
ectMessage), | ||||
UINT_MAX, | ||||
GNUNET_TIME_UNIT_FOREVER_REL, | ||||
NULL, NULL); | ||||
} | } | |||
/** | /** | |||
* Obtain current latency information for the given neighbour. | * Create a fresh entry in the neighbour map for the given peer | |||
* | ||||
* @param peer | ||||
* @return observed latency of the address, FOREVER if the address was | ||||
* never successfully validated | ||||
*/ | ||||
struct GNUNET_TIME_Relative | ||||
GST_neighbour_get_latency (const struct GNUNET_PeerIdentity *peer) | ||||
{ | ||||
struct NeighbourMapEntry *n; | ||||
n = lookup_neighbour (peer); | ||||
if ((NULL == n) || ((n->address == NULL) && (n->session == NULL))) | ||||
return GNUNET_TIME_UNIT_FOREVER_REL; | ||||
return n->latency; | ||||
} | ||||
/** | ||||
* Obtain current address information for the given neighbour. | ||||
* | ||||
* @param peer | ||||
* @return address currently used | ||||
*/ | ||||
struct GNUNET_HELLO_Address * | ||||
GST_neighbour_get_current_address (const struct GNUNET_PeerIdentity *peer) | ||||
{ | ||||
struct NeighbourMapEntry *n; | ||||
n = lookup_neighbour (peer); | ||||
if ((NULL == n) || ((n->address == NULL) && (n->session == NULL))) | ||||
return NULL; | ||||
return n->address; | ||||
} | ||||
/** | ||||
* Create an entry in the neighbour map for the given peer | ||||
* | * | |||
* @param peer peer to create an entry for | * @param peer peer to create an entry for | |||
* @return new neighbour map entry | * @return new neighbour map entry | |||
*/ | */ | |||
static struct NeighbourMapEntry * | static struct NeighbourMapEntry * | |||
setup_neighbour (const struct GNUNET_PeerIdentity *peer) | setup_neighbour (const struct GNUNET_PeerIdentity *peer) | |||
{ | { | |||
struct NeighbourMapEntry *n; | struct NeighbourMapEntry *n; | |||
#if DEBUG_TRANSPORT | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | |||
"Unknown peer `%s', creating new neighbour\n", GNUNET_i2s (pe | "Creating new neighbour entry for `%s'\n", | |||
er)); | GNUNET_i2s (peer)); | |||
#endif | ||||
n = GNUNET_malloc (sizeof (struct NeighbourMapEntry)); | n = GNUNET_malloc (sizeof (struct NeighbourMapEntry)); | |||
n->id = *peer; | n->id = *peer; | |||
n->state = S_NOT_CONNECTED; | n->state = S_NOT_CONNECTED; | |||
n->latency = GNUNET_TIME_relative_get_forever (); | n->latency = GNUNET_TIME_UNIT_FOREVER_REL; | |||
GNUNET_BANDWIDTH_tracker_init (&n->in_tracker, | GNUNET_BANDWIDTH_tracker_init (&n->in_tracker, | |||
GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT, | GNUNET_CONSTANTS_DEFAULT_BW_IN_OUT, | |||
MAX_BANDWIDTH_CARRY_S); | MAX_BANDWIDTH_CARRY_S); | |||
n->timeout_task = | n->task = GNUNET_SCHEDULER_add_now (&master_task, n); | |||
GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOU | ||||
T, | ||||
&neighbour_timeout_task, n); | ||||
GNUNET_assert (GNUNET_OK == | GNUNET_assert (GNUNET_OK == | |||
GNUNET_CONTAINER_multihashmap_put (neighbours, | GNUNET_CONTAINER_multihashmap_put (neighbours, | |||
&n->id.hashPubKey, n, | &n->id.hashPubKey, n, | |||
GNUNET_CONTAINER_MULTIH ASHMAPOPTION_UNIQUE_ONLY)); | GNUNET_CONTAINER_MULTIH ASHMAPOPTION_UNIQUE_ONLY)); | |||
return n; | return n; | |||
} | } | |||
/** | /** | |||
* Check if the two given addresses are the same. | ||||
* Actually only checks if the sessions are non-NULL | ||||
* (which they should be) and then if they are identical; | ||||
* the actual addresses don't matter if the session | ||||
* pointers match anyway, and we must have session pointers | ||||
* at this time. | ||||
* | ||||
* @param a1 first address to compare | ||||
* @param a2 other address to compare | ||||
* @return GNUNET_NO if the addresses do not match, GNUNET_YES if they do m | ||||
atch | ||||
*/ | ||||
static int | ||||
address_matches (const struct NeighbourAddress *a1, | ||||
const struct NeighbourAddress *a2) | ||||
{ | ||||
if ( (NULL == a1->session) || | ||||
(NULL == a2->session) ) | ||||
{ | ||||
GNUNET_break (0); | ||||
return 0; | ||||
} | ||||
return (a1->session == a2->session) ? GNUNET_YES : GNUNET_NO; | ||||
} | ||||
/** | ||||
* Try to create a connection to the given target (eventually). | * Try to create a connection to the given target (eventually). | |||
* | * | |||
* @param target peer to try to connect to | * @param target peer to try to connect to | |||
*/ | */ | |||
void | void | |||
GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target) | GST_neighbours_try_connect (const struct GNUNET_PeerIdentity *target) | |||
{ | { | |||
struct NeighbourMapEntry *n; | struct NeighbourMapEntry *n; | |||
// This can happen during shutdown | if (NULL == neighbours) | |||
if (neighbours == NULL) | return; /* during shutdown, do nothing */ | |||
{ | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | |||
return; | "Asked to connect to peer `%s'\n", | |||
} | ||||
#if DEBUG_TRANSPORT | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying to connect to peer `%s'\n", | ||||
GNUNET_i2s (target)); | GNUNET_i2s (target)); | |||
#endif | ||||
if (0 == | if (0 == | |||
memcmp (target, &GST_my_identity, sizeof (struct GNUNET_PeerIdentity) )) | memcmp (target, &GST_my_identity, sizeof (struct GNUNET_PeerIdentity) )) | |||
{ | { | |||
/* my own hello */ | /* refuse to connect to myself */ | |||
/* FIXME: can this happen? Is this not an API violation? */ | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Refusing to try to connect to myself.\n"); | ||||
return; | return; | |||
} | } | |||
n = lookup_neighbour (target); | n = lookup_neighbour (target); | |||
if (NULL != n) | if (NULL != n) | |||
{ | { | |||
if ((S_CONNECTED == n->state) || (is_connecting (n))) | switch (n->state) | |||
return; /* already connecting or connected */ | { | |||
if (is_disconnecting (n)) | case S_NOT_CONNECTED: | |||
change_state (n, S_NOT_CONNECTED); | /* this should not be possible */ | |||
GNUNET_break (0); | ||||
free_neighbour (n, GNUNET_NO); | ||||
break; | ||||
case S_INIT_ATS: | ||||
case S_INIT_BLACKLIST: | ||||
case S_CONNECT_SENT: | ||||
case S_CONNECT_RECV_ATS: | ||||
case S_CONNECT_RECV_BLACKLIST: | ||||
case S_CONNECT_RECV_ACK: | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Ignoring request to try to connect to `%s', already tryin | ||||
g!\n", | ||||
GNUNET_i2s (target)); | ||||
return; /* already trying */ | ||||
case S_CONNECTED: | ||||
case S_RECONNECT_ATS: | ||||
case S_RECONNECT_BLACKLIST: | ||||
case S_RECONNECT_SENT: | ||||
case S_CONNECTED_SWITCHING_BLACKLIST: | ||||
case S_CONNECTED_SWITCHING_CONNECT_SENT: | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Ignoring request to try to connect, already connected to | ||||
`%s'!\n", | ||||
GNUNET_i2s (target)); | ||||
return; /* already connected */ | ||||
case S_DISCONNECT: | ||||
/* get rid of remains, ready to re-try immediately */ | ||||
free_neighbour (n, GNUNET_NO); | ||||
break; | ||||
case S_DISCONNECT_FINISHED: | ||||
/* should not be possible */ | ||||
GNUNET_assert (0); | ||||
default: | ||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unhandled state `%s' \n",print_ | ||||
state (n->state)); | ||||
GNUNET_break (0); | ||||
free_neighbour (n, GNUNET_NO); | ||||
break; | ||||
} | ||||
} | } | |||
n = setup_neighbour (target); | ||||
n->state = S_INIT_ATS; | ||||
n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT); | ||||
if (n == NULL) | GNUNET_ATS_reset_backoff (GST_ats, target); | |||
n = setup_neighbour (target); | GNUNET_ATS_suggest_address (GST_ats, target); | |||
#if DEBUG_TRANSPORT | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Asking ATS for suggested address to connect to peer `%s'\n", | ||||
GNUNET_i2s (&n->id)); | ||||
#endif | ||||
GNUNET_ATS_suggest_address (GST_ats, &n->id); | ||||
} | } | |||
/** | /** | |||
* Test if we're connected to the given peer. | * Function called with the result of a blacklist check. | |||
* | * | |||
* @param target peer to test | * @param cls closure with the 'struct BlackListCheckContext' | |||
* @return GNUNET_YES if we are connected, GNUNET_NO if not | * @param peer peer this check affects | |||
* @param result GNUNET_OK if the address is allowed | ||||
*/ | */ | |||
int | static void | |||
GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target) | handle_test_blacklist_cont (void *cls, | |||
const struct GNUNET_PeerIdentity *peer, | ||||
int result) | ||||
{ | { | |||
struct BlackListCheckContext *bcc = cls; | ||||
struct NeighbourMapEntry *n; | struct NeighbourMapEntry *n; | |||
// This can happen during shutdown | bcc->bc = NULL; | |||
if (neighbours == NULL) | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | |||
"Connection to new address of peer `%s' based on blacklist is | ||||
`%s'\n", | ||||
GNUNET_i2s (peer), | ||||
(GNUNET_OK == result) ? "allowed" : "FORBIDDEN"); | ||||
if (GNUNET_OK == result) | ||||
{ | { | |||
return GNUNET_NO; | /* valid new address, let ATS know! */ | |||
GNUNET_ATS_address_update (GST_ats, | ||||
bcc->na.address, | ||||
bcc->na.session, | ||||
bcc->ats, bcc->ats_count); | ||||
} | } | |||
if (NULL == (n = lookup_neighbour (peer))) | ||||
n = lookup_neighbour (target); | goto cleanup; /* nobody left to care about new address */ | |||
switch (n->state) | ||||
if ((NULL == n) || (S_CONNECTED != n->state)) | { | |||
return GNUNET_NO; /* not connected */ | case S_NOT_CONNECTED: | |||
return GNUNET_YES; | /* this should not be possible */ | |||
GNUNET_break (0); | ||||
free_neighbour (n, GNUNET_NO); | ||||
break; | ||||
case S_INIT_ATS: | ||||
/* still waiting on ATS suggestion */ | ||||
break; | ||||
case S_INIT_BLACKLIST: | ||||
/* check if the address the blacklist was fine with matches | ||||
ATS suggestion, if so, we can move on! */ | ||||
if ( (GNUNET_OK == result) && | ||||
(1 == n->send_connect_ack) ) | ||||
{ | ||||
n->send_connect_ack = 2; | ||||
send_session_connect_ack_message (bcc->na.address, | ||||
bcc->na.session, | ||||
n->connect_ack_timestamp); | ||||
} | ||||
if (GNUNET_YES != address_matches (&bcc->na, &n->primary_address)) | ||||
break; /* result for an address we currently don't care about */ | ||||
if (GNUNET_OK == result) | ||||
{ | ||||
n->timeout = GNUNET_TIME_relative_to_absolute (SETUP_CONNECTION_TIMEO | ||||
UT); | ||||
n->state = S_CONNECT_SENT; | ||||
send_session_connect (&n->primary_address); | ||||
} | ||||
else | ||||
{ | ||||
// FIXME: should also possibly destroy session with plugin!? | ||||
GNUNET_ATS_address_destroyed (GST_ats, | ||||
bcc->na.address, | ||||
NULL); | ||||
free_address (&n->primary_address); | ||||
n->state = S_INIT_ATS; | ||||
n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT); | ||||
// FIXME: do we need to ask ATS again for suggestions? | ||||
GNUNET_ATS_suggest_address (GST_ats, &n->id); | ||||
} | ||||
break; | ||||
case S_CONNECT_SENT: | ||||
/* waiting on CONNECT_ACK, send ACK if one is pending */ | ||||
if ( (GNUNET_OK == result) && | ||||
(1 == n->send_connect_ack) ) | ||||
{ | ||||
n->send_connect_ack = 2; | ||||
send_session_connect_ack_message (n->primary_address.address, | ||||
n->primary_address.session, | ||||
n->connect_ack_timestamp); | ||||
} | ||||
break; | ||||
case S_CONNECT_RECV_ATS: | ||||
/* still waiting on ATS suggestion, don't care about blacklist */ | ||||
break; | ||||
case S_CONNECT_RECV_BLACKLIST: | ||||
if (GNUNET_YES != address_matches (&bcc->na, &n->primary_address)) | ||||
break; /* result for an address we currently don't care about */ | ||||
if (GNUNET_OK == result) | ||||
{ | ||||
n->timeout = GNUNET_TIME_relative_to_absolute (SETUP_CONNECTION_TIMEO | ||||
UT); | ||||
n->state = S_CONNECT_RECV_ACK; | ||||
send_session_connect_ack_message (bcc->na.address, | ||||
bcc->na.session, | ||||
n->connect_ack_timestamp); | ||||
if (1 == n->send_connect_ack) | ||||
n->send_connect_ack = 2; | ||||
} | ||||
else | ||||
{ | ||||
// FIXME: should also possibly destroy session with plugin!? | ||||
GNUNET_ATS_address_destroyed (GST_ats, | ||||
bcc->na.address, | ||||
NULL); | ||||
free_address (&n->primary_address); | ||||
n->state = S_INIT_ATS; | ||||
n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT); | ||||
// FIXME: do we need to ask ATS again for suggestions? | ||||
GNUNET_ATS_reset_backoff (GST_ats, peer); | ||||
GNUNET_ATS_suggest_address (GST_ats, &n->id); | ||||
} | ||||
break; | ||||
case S_CONNECT_RECV_ACK: | ||||
/* waiting on SESSION_ACK, send ACK if one is pending */ | ||||
if ( (GNUNET_OK == result) && | ||||
(1 == n->send_connect_ack) ) | ||||
{ | ||||
n->send_connect_ack = 2; | ||||
send_session_connect_ack_message (n->primary_address.address, | ||||
n->primary_address.session, | ||||
n->connect_ack_timestamp); | ||||
} | ||||
break; | ||||
case S_CONNECTED: | ||||
/* already connected, don't care about blacklist */ | ||||
break; | ||||
case S_RECONNECT_ATS: | ||||
/* still waiting on ATS suggestion, don't care about blacklist */ | ||||
break; | ||||
case S_RECONNECT_BLACKLIST: | ||||
if ( (GNUNET_OK == result) && | ||||
(1 == n->send_connect_ack) ) | ||||
{ | ||||
n->send_connect_ack = 2; | ||||
send_session_connect_ack_message (bcc->na.address, | ||||
bcc->na.session, | ||||
n->connect_ack_timestamp); | ||||
} | ||||
if (GNUNET_YES != address_matches (&bcc->na, &n->primary_address)) | ||||
break; /* result for an address we currently don't care about */ | ||||
if (GNUNET_OK == result) | ||||
{ | ||||
send_session_connect (&n->primary_address); | ||||
n->timeout = GNUNET_TIME_relative_to_absolute (FAST_RECONNECT_TIMEOUT | ||||
); | ||||
n->state = S_RECONNECT_SENT; | ||||
} | ||||
else | ||||
{ | ||||
GNUNET_ATS_address_destroyed (GST_ats, | ||||
bcc->na.address, | ||||
NULL); | ||||
n->state = S_RECONNECT_ATS; | ||||
n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT); | ||||
// FIXME: do we need to ask ATS again for suggestions? | ||||
GNUNET_ATS_suggest_address (GST_ats, &n->id); | ||||
} | ||||
break; | ||||
case S_RECONNECT_SENT: | ||||
/* waiting on CONNECT_ACK, don't care about blacklist */ | ||||
if ( (GNUNET_OK == result) && | ||||
(1 == n->send_connect_ack) ) | ||||
{ | ||||
n->send_connect_ack = 2; | ||||
send_session_connect_ack_message (n->primary_address.address, | ||||
n->primary_address.session, | ||||
n->connect_ack_timestamp); | ||||
} | ||||
break; | ||||
case S_CONNECTED_SWITCHING_BLACKLIST: | ||||
if (GNUNET_YES != address_matches (&bcc->na, &n->alternative_address)) | ||||
break; /* result for an address we currently don't care about */ | ||||
if (GNUNET_OK == result) | ||||
{ | ||||
send_session_connect (&n->alternative_address); | ||||
n->state = S_CONNECTED_SWITCHING_CONNECT_SENT; | ||||
} | ||||
else | ||||
{ | ||||
GNUNET_ATS_address_destroyed (GST_ats, | ||||
bcc->na.address, | ||||
NULL); | ||||
free_address (&n->alternative_address); | ||||
n->state = S_CONNECTED; | ||||
} | ||||
break; | ||||
case S_CONNECTED_SWITCHING_CONNECT_SENT: | ||||
/* waiting on CONNECT_ACK, don't care about blacklist */ | ||||
if ( (GNUNET_OK == result) && | ||||
(1 == n->send_connect_ack) ) | ||||
{ | ||||
n->send_connect_ack = 2; | ||||
send_session_connect_ack_message (n->primary_address.address, | ||||
n->primary_address.session, | ||||
n->connect_ack_timestamp); | ||||
} | ||||
break; | ||||
case S_DISCONNECT: | ||||
/* Nothing to do here, ATS will already do what can be done */ | ||||
break; | ||||
case S_DISCONNECT_FINISHED: | ||||
/* should not be possible */ | ||||
GNUNET_assert (0); | ||||
break; | ||||
default: | ||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unhandled state `%s' \n",print_st | ||||
ate (n->state)); | ||||
GNUNET_break (0); | ||||
free_neighbour (n, GNUNET_NO); | ||||
break; | ||||
} | ||||
cleanup: | ||||
GNUNET_HELLO_address_free (bcc->na.address); | ||||
GNUNET_CONTAINER_DLL_remove (bc_head, | ||||
bc_tail, | ||||
bcc); | ||||
GNUNET_free (bcc); | ||||
} | } | |||
/** | /** | |||
* A session was terminated. Take note. | * We want to know if connecting to a particular peer via | |||
* a particular address is allowed. Check it! | ||||
* | * | |||
* @param peer identity of the peer where the session died | * @param peer identity of the peer to switch the address for | |||
* @param session session that is gone | * @param ts time at which the check was initiated | |||
* @param address address of the other peer, NULL if other peer | ||||
* connected to us | ||||
* @param session session to use (or NULL) | ||||
* @param ats performance data | ||||
* @param ats_count number of entries in ats (excluding 0-termination) | ||||
*/ | */ | |||
void | static void | |||
GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer, | check_blacklist (const struct GNUNET_PeerIdentity *peer, | |||
struct Session *session) | struct GNUNET_TIME_Absolute ts, | |||
const struct GNUNET_HELLO_Address *address, | ||||
struct Session *session, | ||||
const struct GNUNET_ATS_Information *ats, | ||||
uint32_t ats_count) | ||||
{ | { | |||
struct NeighbourMapEntry *n; | struct BlackListCheckContext *bcc; | |||
struct GST_BlacklistCheck *bc; | ||||
if (neighbours == NULL) | ||||
{ | ||||
/* This can happen during shutdown */ | ||||
return; | ||||
} | ||||
#if DEBUG_TRANSPORT | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Session %X to peer `%s' ended \n", | ||||
session, GNUNET_i2s (peer)); | ||||
#endif | ||||
n = lookup_neighbour (peer); | bcc = | |||
if (NULL == n) | GNUNET_malloc (sizeof (struct BlackListCheckContext) + | |||
return; | sizeof (struct GNUNET_ATS_Information) * ats_count); | |||
if (session != n->session) | bcc->ats_count = ats_count; | |||
return; /* doesn't affect us */ | bcc->na.address = GNUNET_HELLO_address_copy (address); | |||
if (n->state == S_CONNECTED) | bcc->na.session = session; | |||
{ | bcc->na.connect_timestamp = ts; | |||
if (n->address_state == USED) | bcc->ats = (struct GNUNET_ATS_Information *) &bcc[1]; | |||
{ | memcpy (bcc->ats, ats, sizeof (struct GNUNET_ATS_Information) * ats_count | |||
GST_validation_set_address_use (n->address, n->session, GNUNET_NO); | ); | |||
GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_NO | GNUNET_CONTAINER_DLL_insert (bc_head, | |||
); | bc_tail, | |||
n->address_state = UNUSED; | bcc); | |||
} | if (NULL != (bc = GST_blacklist_test_allowed (peer, | |||
} | address->transport_name, | |||
&handle_test_blacklist_cont, | ||||
bcc))) | ||||
bcc->bc = bc; | ||||
/* if NULL == bc, 'cont' was already called and 'bcc' already free'd, so | ||||
we must only store 'bc' if 'bc' is non-NULL... */ | ||||
} | ||||
if (NULL != n->address) | /** | |||
{ | * We received a 'SESSION_CONNECT' message from the other peer. | |||
GNUNET_HELLO_address_free (n->address); | * Consider switching to it. | |||
n->address = NULL; | * | |||
} | * @param message possibly a 'struct SessionConnectMessage' (check format) | |||
n->session = NULL; | * @param peer identity of the peer to switch the address for | |||
* @param address address of the other peer, NULL if other peer | ||||
* connected to us | ||||
* @param session session to use (or NULL) | ||||
* @param ats performance data | ||||
* @param ats_count number of entries in ats (excluding 0-termination) | ||||
*/ | ||||
void | ||||
GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message, | ||||
const struct GNUNET_PeerIdentity *peer, | ||||
const struct GNUNET_HELLO_Address *address, | ||||
struct Session *session, | ||||
const struct GNUNET_ATS_Information *ats, | ||||
uint32_t ats_count) | ||||
{ | ||||
const struct SessionConnectMessage *scm; | ||||
struct NeighbourMapEntry *n; | ||||
struct GNUNET_TIME_Absolute ts; | ||||
/* not connected anymore anyway, shouldn't matter */ | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | |||
if (S_CONNECTED != n->state) | "Received CONNECT message from peer `%s'\n", | |||
GNUNET_i2s (peer)); | ||||
if (ntohs (message->size) != sizeof (struct SessionConnectMessage)) | ||||
{ | ||||
GNUNET_break_op (0); | ||||
return; | return; | |||
} | ||||
if (n->keepalive_task != GNUNET_SCHEDULER_NO_TASK) | if (NULL == neighbours) | |||
return; /* we're shutting down */ | ||||
scm = (const struct SessionConnectMessage *) message; | ||||
GNUNET_break_op (0 == ntohl (scm->reserved)); | ||||
ts = GNUNET_TIME_absolute_ntoh (scm->timestamp); | ||||
n = lookup_neighbour (peer); | ||||
if (NULL == n) | ||||
n = setup_neighbour (peer); | ||||
n->send_connect_ack = 1; | ||||
n->connect_ack_timestamp = ts; | ||||
switch (n->state) | ||||
{ | { | |||
GNUNET_SCHEDULER_cancel (n->keepalive_task); | case S_NOT_CONNECTED: | |||
n->keepalive_task = GNUNET_SCHEDULER_NO_TASK; | n->state = S_CONNECT_RECV_ATS; | |||
n->expect_latency_response = GNUNET_NO; | n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT); | |||
GNUNET_ATS_reset_backoff (GST_ats, peer); | ||||
GNUNET_ATS_suggest_address (GST_ats, peer); | ||||
check_blacklist (peer, ts, address, session, ats, ats_count); | ||||
break; | ||||
case S_INIT_ATS: | ||||
case S_INIT_BLACKLIST: | ||||
case S_CONNECT_SENT: | ||||
case S_CONNECT_RECV_ATS: | ||||
case S_CONNECT_RECV_BLACKLIST: | ||||
case S_CONNECT_RECV_ACK: | ||||
/* It can never hurt to have an alternative address in the above cases, | ||||
see if it is allowed */ | ||||
check_blacklist (peer, ts, address, session, ats, ats_count); | ||||
break; | ||||
case S_CONNECTED: | ||||
/* we are already connected and can thus send the ACK immediately; | ||||
still, it can never hurt to have an alternative address, so also | ||||
tell ATS about it */ | ||||
GNUNET_assert (NULL != n->primary_address.address); | ||||
GNUNET_assert (NULL != n->primary_address.session); | ||||
n->send_connect_ack = 0; | ||||
send_session_connect_ack_message (n->primary_address.address, | ||||
n->primary_address.session, ts); | ||||
check_blacklist (peer, ts, address, session, ats, ats_count); | ||||
break; | ||||
case S_RECONNECT_ATS: | ||||
case S_RECONNECT_BLACKLIST: | ||||
case S_RECONNECT_SENT: | ||||
/* It can never hurt to have an alternative address in the above cases, | ||||
see if it is allowed */ | ||||
check_blacklist (peer, ts, address, session, ats, ats_count); | ||||
break; | ||||
case S_CONNECTED_SWITCHING_BLACKLIST: | ||||
case S_CONNECTED_SWITCHING_CONNECT_SENT: | ||||
/* we are already connected and can thus send the ACK immediately; | ||||
still, it can never hurt to have an alternative address, so also | ||||
tell ATS about it */ | ||||
GNUNET_assert (NULL != n->primary_address.address); | ||||
GNUNET_assert (NULL != n->primary_address.session); | ||||
n->send_connect_ack = 0; | ||||
send_session_connect_ack_message (n->primary_address.address, | ||||
n->primary_address.session, ts); | ||||
check_blacklist (peer, ts, address, session, ats, ats_count); | ||||
break; | ||||
case S_DISCONNECT: | ||||
/* get rid of remains without terminating sessions, ready to re-try */ | ||||
free_neighbour (n, GNUNET_YES); | ||||
n = setup_neighbour (peer); | ||||
n->state = S_CONNECT_RECV_ATS; | ||||
GNUNET_ATS_reset_backoff (GST_ats, peer); | ||||
GNUNET_ATS_suggest_address (GST_ats, peer); | ||||
break; | ||||
case S_DISCONNECT_FINISHED: | ||||
/* should not be possible */ | ||||
GNUNET_assert (0); | ||||
break; | ||||
default: | ||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unhandled state `%s' \n",print_st | ||||
ate (n->state)); | ||||
GNUNET_break (0); | ||||
free_neighbour (n, GNUNET_NO); | ||||
break; | ||||
} | } | |||
/* connected, try fast reconnect */ | ||||
/* statistics "transport" : "# peers connected" -= 1 | ||||
* neighbours_connected -= 1 | ||||
* BUT: no disconnect_cb to notify clients about disconnect | ||||
*/ | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Trying fast reconnect to peer `%s'\ | ||||
n", | ||||
GNUNET_i2s (peer)); | ||||
GNUNET_assert (neighbours_connected > 0); | ||||
change_state (n, S_FAST_RECONNECT); | ||||
neighbours_connected--; | ||||
GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), | ||||
-1, | ||||
GNUNET_NO); | ||||
/* We are connected, so ask ATS to switch addresses */ | ||||
GNUNET_SCHEDULER_cancel (n->timeout_task); | ||||
n->timeout_task = GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_DISCONNE | ||||
CT_SESSION_TIMEOUT, | ||||
&neighbour_timeout_task, n); | ||||
/* try QUICKLY to re-establish a connection, reduce timeout! */ | ||||
if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK) | ||||
GNUNET_SCHEDULER_cancel (n->ats_suggest); | ||||
n->ats_suggest = GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, | ||||
&ats_suggest_cancel, | ||||
n); | ||||
GNUNET_ATS_suggest_address (GST_ats, peer); | ||||
} | } | |||
/** | /** | |||
* Transmit a message to the given target using the active connection. | * For an existing neighbour record, set the active connection to | |||
* use the given address. | ||||
* | * | |||
* @param target destination | * @param peer identity of the peer to switch the address for | |||
* @param msg message to send | * @param address address of the other peer, NULL if other peer | |||
* @param msg_size number of bytes in msg | * connected to us | |||
* @param timeout when to fail with timeout | * @param session session to use (or NULL) | |||
* @param cont function to call when done | * @param ats performance data | |||
* @param cont_cls closure for 'cont' | * @param ats_count number of entries in ats | |||
* @param bandwidth_in inbound quota to be used when connection is up | ||||
* @param bandwidth_out outbound quota to be used when connection is up | ||||
*/ | */ | |||
void | void | |||
GST_neighbours_send (const struct GNUNET_PeerIdentity *target, const void * | GST_neighbours_switch_to_address (const struct GNUNET_PeerIdentity *peer, | |||
msg, | const struct GNUNET_HELLO_Address *address | |||
size_t msg_size, struct GNUNET_TIME_Relative timeout, | , | |||
GST_NeighbourSendContinuation cont, void *cont_cls) | struct Session *session, | |||
const struct GNUNET_ATS_Information *ats, | ||||
uint32_t ats_count, | ||||
struct GNUNET_BANDWIDTH_Value32NBO | ||||
bandwidth_in, | ||||
struct GNUNET_BANDWIDTH_Value32NBO | ||||
bandwidth_out) | ||||
{ | { | |||
struct NeighbourMapEntry *n; | struct NeighbourMapEntry *n; | |||
struct MessageQueue *mq; | struct GNUNET_TRANSPORT_PluginFunctions *papi; | |||
// This can happen during shutdown | GNUNET_assert (address->transport_name != NULL); | |||
if (neighbours == NULL) | if (NULL == (n = lookup_neighbour (peer))) | |||
return; | ||||
/* Obtain an session for this address from plugin */ | ||||
if (NULL == (papi = GST_plugins_find (address->transport_name))) | ||||
{ | { | |||
/* we don't have the plugin for this address */ | ||||
GNUNET_ATS_address_destroyed (GST_ats, address, NULL); | ||||
return; | return; | |||
} | } | |||
if ((NULL == session) && (0 == address->address_length)) | ||||
n = lookup_neighbour (target); | ||||
if ((n == NULL) || (!is_connected (n))) | ||||
{ | { | |||
GNUNET_STATISTICS_update (GST_stats, | GNUNET_break (0); | |||
gettext_noop | if (strlen (address->transport_name) > 0) | |||
("# messages not sent (no such peer or not co | GNUNET_ATS_address_destroyed (GST_ats, address, session); | |||
nnected)"), | ||||
1, GNUNET_NO); | ||||
#if DEBUG_TRANSPORT | ||||
if (n == NULL) | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Could not send message to peer `%s': unknown neighbour", | ||||
GNUNET_i2s (target)); | ||||
else if (!is_connected (n)) | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Could not send message to peer `%s': not connected\n", | ||||
GNUNET_i2s (target)); | ||||
#endif | ||||
if (NULL != cont) | ||||
cont (cont_cls, GNUNET_SYSERR); | ||||
return; | return; | |||
} | } | |||
if (NULL == session) | ||||
if ((n->session == NULL) && (n->address == NULL)) | session = papi->get_session (papi->cls, address); | |||
if (NULL == session) | ||||
{ | { | |||
GNUNET_STATISTICS_update (GST_stats, | ||||
gettext_noop | ||||
("# messages not sent (no such peer or not co | ||||
nnected)"), | ||||
1, GNUNET_NO); | ||||
#if DEBUG_TRANSPORT | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | |||
"Could not send message to peer `%s': no address available\ | "Failed to obtain new session for peer `%s' and address '%s | |||
n", | '\n", | |||
GNUNET_i2s (target)); | GNUNET_i2s (&address->peer), GST_plugins_a2s (address)); | |||
#endif | GNUNET_ATS_address_destroyed (GST_ats, address, NULL); | |||
if (NULL != cont) | ||||
cont (cont_cls, GNUNET_SYSERR); | ||||
return; | return; | |||
} | } | |||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
GNUNET_assert (msg_size >= sizeof (struct GNUNET_MessageHeader)); | "ATS tells us to switch to address '%s' for peer `%s'\n", | |||
GNUNET_STATISTICS_update (GST_stats, | (address->address_length != 0) ? GST_plugins_a2s (address): " | |||
gettext_noop | <inbound>", | |||
("# bytes in message queue for other peers"), | GNUNET_i2s (peer)); | |||
msg_size, GNUNET_NO); | switch (n->state) | |||
mq = GNUNET_malloc (sizeof (struct MessageQueue) + msg_size); | { | |||
mq->cont = cont; | case S_NOT_CONNECTED: | |||
mq->cont_cls = cont_cls; | GNUNET_break (0); | |||
/* FIXME: this memcpy can be up to 7% of our total runtime! */ | free_neighbour (n, GNUNET_NO); | |||
memcpy (&mq[1], msg, msg_size); | return; | |||
mq->message_buf = (const char *) &mq[1]; | case S_INIT_ATS: | |||
mq->message_buf_size = msg_size; | set_address (&n->primary_address, | |||
mq->timeout = GNUNET_TIME_relative_to_absolute (timeout); | address, session, bandwidth_in, bandwidth_out, GNUNET_NO); | |||
GNUNET_CONTAINER_DLL_insert_tail (n->messages_head, n->messages_tail, mq) | n->state = S_INIT_BLACKLIST; | |||
; | n->timeout = GNUNET_TIME_relative_to_absolute (BLACKLIST_RESPONSE_TIMEO | |||
UT); | ||||
if ((GNUNET_SCHEDULER_NO_TASK == n->transmission_task) && | check_blacklist (&n->id, | |||
(NULL == n->is_active)) | n->connect_ack_timestamp, | |||
n->transmission_task = GNUNET_SCHEDULER_add_now (&transmission_task, n) | address, session, ats, ats_count); | |||
; | break; | |||
case S_INIT_BLACKLIST: | ||||
/* ATS suggests a different address, switch again */ | ||||
set_address (&n->primary_address, | ||||
address, session, bandwidth_in, bandwidth_out, GNUNET_NO); | ||||
n->timeout = GNUNET_TIME_relative_to_absolute (BLACKLIST_RESPONSE_TIMEO | ||||
UT); | ||||
check_blacklist (&n->id, | ||||
n->connect_ack_timestamp, | ||||
address, session, ats, ats_count); | ||||
break; | ||||
case S_CONNECT_SENT: | ||||
/* ATS suggests a different address, switch again */ | ||||
set_address (&n->primary_address, | ||||
address, session, bandwidth_in, bandwidth_out, GNUNET_NO); | ||||
n->state = S_INIT_BLACKLIST; | ||||
n->timeout = GNUNET_TIME_relative_to_absolute (BLACKLIST_RESPONSE_TIMEO | ||||
UT); | ||||
check_blacklist (&n->id, | ||||
n->connect_ack_timestamp, | ||||
address, session, ats, ats_count); | ||||
break; | ||||
case S_CONNECT_RECV_ATS: | ||||
set_address (&n->primary_address, | ||||
address, session, bandwidth_in, bandwidth_out, GNUNET_NO); | ||||
n->state = S_CONNECT_RECV_BLACKLIST; | ||||
n->timeout = GNUNET_TIME_relative_to_absolute (BLACKLIST_RESPONSE_TIMEO | ||||
UT); | ||||
check_blacklist (&n->id, | ||||
n->connect_ack_timestamp, | ||||
address, session, ats, ats_count); | ||||
break; | ||||
case S_CONNECT_RECV_BLACKLIST: | ||||
case S_CONNECT_RECV_ACK: | ||||
/* ATS asks us to switch while we were trying to connect; switch to new | ||||
address and check blacklist again */ | ||||
set_address (&n->primary_address, | ||||
address, session, bandwidth_in, bandwidth_out, GNUNET_NO); | ||||
n->timeout = GNUNET_TIME_relative_to_absolute (BLACKLIST_RESPONSE_TIMEO | ||||
UT); | ||||
check_blacklist (&n->id, | ||||
n->connect_ack_timestamp, | ||||
address, session, ats, ats_count); | ||||
break; | ||||
case S_CONNECTED: | ||||
GNUNET_assert (NULL != n->primary_address.address); | ||||
GNUNET_assert (NULL != n->primary_address.session); | ||||
if (n->primary_address.session == session) | ||||
{ | ||||
/* not an address change, just a quota change */ | ||||
set_address (&n->primary_address, | ||||
address, session, bandwidth_in, bandwidth_out, GNUNET_YES | ||||
); | ||||
break; | ||||
} | ||||
/* ATS asks us to switch a life connection; see if we can get | ||||
a CONNECT_ACK on it before we actually do this! */ | ||||
set_address (&n->alternative_address, | ||||
address, session, bandwidth_in, bandwidth_out, GNUNET_NO); | ||||
n->state = S_CONNECTED_SWITCHING_BLACKLIST; | ||||
check_blacklist (&n->id, | ||||
GNUNET_TIME_absolute_get (), | ||||
address, session, ats, ats_count); | ||||
break; | ||||
case S_RECONNECT_ATS: | ||||
set_address (&n->primary_address, | ||||
address, session, bandwidth_in, bandwidth_out, GNUNET_NO); | ||||
n->state = S_RECONNECT_BLACKLIST; | ||||
n->timeout = GNUNET_TIME_relative_to_absolute (BLACKLIST_RESPONSE_TIMEO | ||||
UT); | ||||
check_blacklist (&n->id, | ||||
n->connect_ack_timestamp, | ||||
address, session, ats, ats_count); | ||||
break; | ||||
case S_RECONNECT_BLACKLIST: | ||||
/* ATS asks us to switch while we were trying to reconnect; switch to n | ||||
ew | ||||
address and check blacklist again */ | ||||
set_address (&n->primary_address, | ||||
address, session, bandwidth_in, bandwidth_out, GNUNET_NO); | ||||
n->timeout = GNUNET_TIME_relative_to_absolute (BLACKLIST_RESPONSE_TIMEO | ||||
UT); | ||||
check_blacklist (&n->id, | ||||
n->connect_ack_timestamp, | ||||
address, session, ats, ats_count); | ||||
break; | ||||
case S_RECONNECT_SENT: | ||||
/* ATS asks us to switch while we were trying to reconnect; switch to n | ||||
ew | ||||
address and check blacklist again */ | ||||
set_address (&n->primary_address, | ||||
address, session, bandwidth_in, bandwidth_out, GNUNET_NO); | ||||
n->state = S_RECONNECT_BLACKLIST; | ||||
n->timeout = GNUNET_TIME_relative_to_absolute (BLACKLIST_RESPONSE_TIMEO | ||||
UT); | ||||
check_blacklist (&n->id, | ||||
n->connect_ack_timestamp, | ||||
address, session, ats, ats_count); | ||||
break; | ||||
case S_CONNECTED_SWITCHING_BLACKLIST: | ||||
if (n->primary_address.session == session) | ||||
{ | ||||
/* ATS switches back to still-active session */ | ||||
free_address (&n->alternative_address); | ||||
n->state = S_CONNECTED; | ||||
break; | ||||
} | ||||
/* ATS asks us to switch a life connection, update blacklist check */ | ||||
set_address (&n->alternative_address, | ||||
address, session, bandwidth_in, bandwidth_out, GNUNET_NO); | ||||
check_blacklist (&n->id, | ||||
GNUNET_TIME_absolute_get (), | ||||
address, session, ats, ats_count); | ||||
break; | ||||
case S_CONNECTED_SWITCHING_CONNECT_SENT: | ||||
if (n->primary_address.session == session) | ||||
{ | ||||
/* ATS switches back to still-active session */ | ||||
free_address (&n->alternative_address); | ||||
n->state = S_CONNECTED; | ||||
break; | ||||
} | ||||
/* ATS asks us to switch a life connection, update blacklist check */ | ||||
set_address (&n->alternative_address, | ||||
address, session, bandwidth_in, bandwidth_out, GNUNET_NO); | ||||
n->state = S_CONNECTED_SWITCHING_BLACKLIST; | ||||
check_blacklist (&n->id, | ||||
GNUNET_TIME_absolute_get (), | ||||
address, session, ats, ats_count); | ||||
break; | ||||
case S_DISCONNECT: | ||||
/* not going to switch addresses while disconnecting */ | ||||
return; | ||||
case S_DISCONNECT_FINISHED: | ||||
GNUNET_assert (0); | ||||
break; | ||||
default: | ||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unhandled state `%s' \n",print_st | ||||
ate (n->state)); | ||||
GNUNET_break (0); | ||||
break; | ||||
} | ||||
} | } | |||
/** | /** | |||
* We have received a message from the given sender. How long should | * Master task run for every neighbour. Performs all of the time-related | |||
* we delay before receiving more? (Also used to keep the peer marked | * activities (keep alive, send next message, disconnect if idle, finish | |||
* as live). | * clean up after disconnect). | |||
* | * | |||
* @param sender sender of the message | * @param cls the 'struct NeighbourMapEntry' for which we are running | |||
* @param size size of the message | * @param tc scheduler context (unused) | |||
* @param do_forward set to GNUNET_YES if the message should be forwarded t | ||||
o clients | ||||
* GNUNET_NO if the neighbour is not connected or violate | ||||
s the quota, | ||||
* GNUNET_SYSERR if the connection is not fully up yet | ||||
* @return how long to wait before reading more from this sender | ||||
*/ | */ | |||
struct GNUNET_TIME_Relative | static void | |||
GST_neighbours_calculate_receive_delay (const struct GNUNET_PeerIdentity | master_task (void *cls, | |||
*sender, ssize_t size, int *do_forw | const struct GNUNET_SCHEDULER_TaskContext *tc) | |||
ard) | ||||
{ | { | |||
struct NeighbourMapEntry *n; | struct NeighbourMapEntry *n = cls; | |||
struct GNUNET_TIME_Relative ret; | struct GNUNET_TIME_Relative delay; | |||
// This can happen during shutdown | ||||
if (neighbours == NULL) | ||||
{ | ||||
return GNUNET_TIME_UNIT_FOREVER_REL; | ||||
} | ||||
n = lookup_neighbour (sender); | n->task = GNUNET_SCHEDULER_NO_TASK; | |||
if (n == NULL) | delay = GNUNET_TIME_absolute_get_remaining (n->timeout); | |||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"master task runs for neighbour `%s' in state %d with timeout | ||||
in %llu ms\n", | ||||
GNUNET_i2s (&n->id), | ||||
n->state, | ||||
(unsigned long long) delay.rel_value); | ||||
switch (n->state) | ||||
{ | { | |||
GST_neighbours_try_connect (sender); | case S_NOT_CONNECTED: | |||
n = lookup_neighbour (sender); | /* invalid state for master task, clean up */ | |||
if (NULL == n) | GNUNET_break (0); | |||
n->state = S_DISCONNECT_FINISHED; | ||||
free_neighbour (n, GNUNET_NO); | ||||
return; | ||||
case S_INIT_ATS: | ||||
if (0 == delay.rel_value) | ||||
{ | { | |||
GNUNET_STATISTICS_update (GST_stats, | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | |||
gettext_noop | "Connection to `%s' timed out waiting for ATS to provide a | |||
("# messages discarded due to lack of neigh | ddress\n", | |||
bour record"), | GNUNET_i2s (&n->id)); | |||
1, GNUNET_NO); | n->state = S_DISCONNECT_FINISHED; | |||
*do_forward = GNUNET_NO; | free_neighbour (n, GNUNET_NO); | |||
return GNUNET_TIME_UNIT_ZERO; | return; | |||
} | } | |||
break; | ||||
case S_INIT_BLACKLIST: | ||||
if (0 == delay.rel_value) | ||||
{ | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Connection to `%s' timed out waiting for BLACKLIST to app | ||||
rove address\n", | ||||
GNUNET_i2s (&n->id)); | ||||
n->state = S_DISCONNECT_FINISHED; | ||||
free_neighbour (n, GNUNET_NO); | ||||
return; | ||||
} | ||||
break; | ||||
case S_CONNECT_SENT: | ||||
if (0 == delay.rel_value) | ||||
{ | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Connection to `%s' timed out waiting for other peer to se | ||||
nd CONNECT_ACK\n", | ||||
GNUNET_i2s (&n->id)); | ||||
disconnect_neighbour (n); | ||||
return; | ||||
} | ||||
break; | ||||
case S_CONNECT_RECV_ATS: | ||||
if (0 == delay.rel_value) | ||||
{ | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Connection to `%s' timed out waiting ATS to provide addre | ||||
ss to use for CONNECT_ACK\n", | ||||
GNUNET_i2s (&n->id)); | ||||
n->state = S_DISCONNECT_FINISHED; | ||||
free_neighbour (n, GNUNET_NO); | ||||
return; | ||||
} | ||||
break; | ||||
case S_CONNECT_RECV_BLACKLIST: | ||||
if (0 == delay.rel_value) | ||||
{ | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Connection to `%s' timed out waiting BLACKLIST to approve | ||||
address to use for CONNECT_ACK\n", | ||||
GNUNET_i2s (&n->id)); | ||||
n->state = S_DISCONNECT_FINISHED; | ||||
free_neighbour (n, GNUNET_NO); | ||||
return; | ||||
} | ||||
break; | ||||
case S_CONNECT_RECV_ACK: | ||||
if (0 == delay.rel_value) | ||||
{ | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Connection to `%s' timed out waiting for other peer to se | ||||
nd SESSION_ACK\n", | ||||
GNUNET_i2s (&n->id)); | ||||
disconnect_neighbour (n); | ||||
return; | ||||
} | ||||
break; | ||||
case S_CONNECTED: | ||||
if (0 == delay.rel_value) | ||||
{ | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Connection to `%s' timed out, missing KEEPALIVE_RESPONSEs | ||||
\n", | ||||
GNUNET_i2s (&n->id)); | ||||
disconnect_neighbour (n); | ||||
return; | ||||
} | ||||
try_transmission_to_peer (n); | ||||
send_keepalive (n); | ||||
break; | ||||
case S_RECONNECT_ATS: | ||||
if (0 == delay.rel_value) | ||||
{ | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Connection to `%s' timed out, waiting for ATS replacement | ||||
address\n", | ||||
GNUNET_i2s (&n->id)); | ||||
disconnect_neighbour (n); | ||||
return; | ||||
} | ||||
break; | ||||
case S_RECONNECT_BLACKLIST: | ||||
if (0 == delay.rel_value) | ||||
{ | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Connection to `%s' timed out, waiting for BLACKLIST to ap | ||||
prove replacement address\n", | ||||
GNUNET_i2s (&n->id)); | ||||
disconnect_neighbour (n); | ||||
return; | ||||
} | ||||
break; | ||||
case S_RECONNECT_SENT: | ||||
if (0 == delay.rel_value) | ||||
{ | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Connection to `%s' timed out, waiting for other peer to C | ||||
ONNECT_ACK replacement address\n", | ||||
GNUNET_i2s (&n->id)); | ||||
disconnect_neighbour (n); | ||||
return; | ||||
} | ||||
break; | ||||
case S_CONNECTED_SWITCHING_BLACKLIST: | ||||
if (0 == delay.rel_value) | ||||
{ | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Connection to `%s' timed out, missing KEEPALIVE_RESPONSEs | ||||
\n", | ||||
GNUNET_i2s (&n->id)); | ||||
disconnect_neighbour (n); | ||||
return; | ||||
} | ||||
try_transmission_to_peer (n); | ||||
send_keepalive (n); | ||||
break; | ||||
case S_CONNECTED_SWITCHING_CONNECT_SENT: | ||||
if (0 == delay.rel_value) | ||||
{ | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Connection to `%s' timed out, missing KEEPALIVE_RESPONSEs | ||||
(after trying to CONNECT on alternative address)\n", | ||||
GNUNET_i2s (&n->id)); | ||||
disconnect_neighbour (n); | ||||
return; | ||||
} | ||||
try_transmission_to_peer (n); | ||||
send_keepalive (n); | ||||
break; | ||||
case S_DISCONNECT: | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Cleaning up connection to `%s' after sending DISCONNECT\n", | ||||
GNUNET_i2s (&n->id)); | ||||
n->state = S_DISCONNECT_FINISHED; | ||||
free_neighbour (n, GNUNET_NO); | ||||
return; | ||||
case S_DISCONNECT_FINISHED: | ||||
/* how did we get here!? */ | ||||
GNUNET_assert (0); | ||||
break; | ||||
default: | ||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unhandled state `%s' \n",print_st | ||||
ate (n->state)); | ||||
GNUNET_break (0); | ||||
break; | ||||
} | } | |||
if (!is_connected (n)) | if ( (S_CONNECTED_SWITCHING_CONNECT_SENT == n->state) || | |||
{ | (S_CONNECTED_SWITCHING_BLACKLIST == n->state) || | |||
*do_forward = GNUNET_SYSERR; | (S_CONNECTED == n->state) ) | |||
return GNUNET_TIME_UNIT_ZERO; | ||||
} | ||||
if (GNUNET_YES == GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, size) | ||||
) | ||||
{ | { | |||
n->quota_violation_count++; | /* if we are *now* in one of these three states, we're sending | |||
#if DEBUG_TRANSPORT | keep alive messages, so we need to consider the keepalive | |||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | delay, not just the connection timeout */ | |||
"Bandwidth quota (%u b/s) violation detected (total of %u). | delay = GNUNET_TIME_relative_min (GNUNET_TIME_absolute_get_remaining (n | |||
\n", | ->keep_alive_time), | |||
n->in_tracker.available_bytes_per_s__, | delay); | |||
n->quota_violation_count); | ||||
#endif | ||||
/* Discount 32k per violation */ | ||||
GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, -32 * 1024); | ||||
} | } | |||
else | if (GNUNET_SCHEDULER_NO_TASK == n->task) | |||
n->task = GNUNET_SCHEDULER_add_delayed (delay, | ||||
&master_task, | ||||
n); | ||||
} | ||||
/** | ||||
* Send a SESSION_ACK message to the neighbour to confirm that we | ||||
* got his CONNECT_ACK. | ||||
* | ||||
* @param n neighbour to send the SESSION_ACK to | ||||
*/ | ||||
static void | ||||
send_session_ack_message (struct NeighbourMapEntry *n) | ||||
{ | ||||
struct GNUNET_MessageHeader msg; | ||||
msg.size = htons (sizeof (struct GNUNET_MessageHeader)); | ||||
msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK); | ||||
(void) send_with_session(n, | ||||
(const char *) &msg, sizeof (struct GNUNET_Messag | ||||
eHeader), | ||||
UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, | ||||
NULL, NULL); | ||||
} | ||||
/** | ||||
* We received a 'SESSION_CONNECT_ACK' message from the other peer. | ||||
* Consider switching to it. | ||||
* | ||||
* @param message possibly a 'struct SessionConnectMessage' (check format) | ||||
* @param peer identity of the peer to switch the address for | ||||
* @param address address of the other peer, NULL if other peer | ||||
* connected to us | ||||
* @param session session to use (or NULL) | ||||
* @param ats performance data | ||||
* @param ats_count number of entries in ats | ||||
*/ | ||||
void | ||||
GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *messa | ||||
ge, | ||||
const struct GNUNET_PeerIdentity *peer, | ||||
const struct GNUNET_HELLO_Address *addre | ||||
ss, | ||||
struct Session *session, | ||||
const struct GNUNET_ATS_Information *ats | ||||
, | ||||
uint32_t ats_count) | ||||
{ | ||||
const struct SessionConnectMessage *scm; | ||||
struct GNUNET_TIME_Absolute ts; | ||||
struct NeighbourMapEntry *n; | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Received CONNECT_ACK message from peer `%s'\n", | ||||
GNUNET_i2s (peer)); | ||||
if (ntohs (message->size) != sizeof (struct SessionConnectMessage)) | ||||
{ | { | |||
if (n->quota_violation_count > 0) | GNUNET_break_op (0); | |||
{ | return; | |||
/* try to add 32k back */ | ||||
GNUNET_BANDWIDTH_tracker_consume (&n->in_tracker, 32 * 1024); | ||||
n->quota_violation_count--; | ||||
} | ||||
} | } | |||
if (n->quota_violation_count > QUOTA_VIOLATION_DROP_THRESHOLD) | scm = (const struct SessionConnectMessage *) message; | |||
GNUNET_break_op (ntohl (scm->reserved) == 0); | ||||
if (NULL == (n = lookup_neighbour (peer))) | ||||
{ | { | |||
GNUNET_STATISTICS_update (GST_stats, | GNUNET_STATISTICS_update (GST_stats, | |||
gettext_noop | gettext_noop | |||
("# bandwidth quota violations by other peers "), | ("# unexpected CONNECT_ACK messages (no peer) "), | |||
1, GNUNET_NO); | 1, GNUNET_NO); | |||
*do_forward = GNUNET_NO; | return; | |||
return GNUNET_CONSTANTS_QUOTA_VIOLATION_TIMEOUT; | ||||
} | } | |||
*do_forward = GNUNET_YES; | ts = GNUNET_TIME_absolute_ntoh (scm->timestamp); | |||
ret = GNUNET_BANDWIDTH_tracker_get_delay (&n->in_tracker, 32 * 1024); | switch (n->state) | |||
if (ret.rel_value > 0) | ||||
{ | { | |||
#if DEBUG_TRANSPORT | case S_NOT_CONNECTED: | |||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | GNUNET_break (0); | |||
"Throttling read (%llu bytes excess at %u b/s), waiting %ll | free_neighbour (n, GNUNET_NO); | |||
u ms before reading more.\n", | return; | |||
(unsigned long long) n->in_tracker. | case S_INIT_ATS: | |||
consumption_since_last_update__, | case S_INIT_BLACKLIST: | |||
(unsigned int) n->in_tracker.available_bytes_per_s__, | ||||
(unsigned long long) ret.rel_value); | ||||
#endif | ||||
GNUNET_STATISTICS_update (GST_stats, | GNUNET_STATISTICS_update (GST_stats, | |||
gettext_noop ("# ms throttling suggested"), | gettext_noop | |||
(int64_t) ret.rel_value, GNUNET_NO); | ("# unexpected CONNECT_ACK messages (not read | |||
y)"), | ||||
1, GNUNET_NO); | ||||
break; | ||||
case S_CONNECT_SENT: | ||||
if (ts.abs_value != n->primary_address.connect_timestamp.abs_value) | ||||
break; /* ACK does not match our original CONNECT message */ | ||||
n->state = S_CONNECTED; | ||||
n->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CO | ||||
NNECTION_TIMEOUT); | ||||
GNUNET_STATISTICS_set (GST_stats, | ||||
gettext_noop ("# peers connected"), | ||||
++neighbours_connected, | ||||
GNUNET_NO); | ||||
connect_notify_cb (callback_cls, &n->id, ats, ats_count); | ||||
set_address (&n->primary_address, | ||||
n->primary_address.address, | ||||
n->primary_address.session, | ||||
n->primary_address.bandwidth_in, | ||||
n->primary_address.bandwidth_out, | ||||
GNUNET_YES); | ||||
send_session_ack_message (n); | ||||
break; | ||||
case S_CONNECT_RECV_ATS: | ||||
case S_CONNECT_RECV_BLACKLIST: | ||||
case S_CONNECT_RECV_ACK: | ||||
GNUNET_STATISTICS_update (GST_stats, | ||||
gettext_noop | ||||
("# unexpected CONNECT_ACK messages (not read | ||||
y)"), | ||||
1, GNUNET_NO); | ||||
break; | ||||
case S_CONNECTED: | ||||
/* duplicate CONNECT_ACK, let's answer by duplciate SESSION_ACK just in | ||||
case */ | ||||
send_session_ack_message (n); | ||||
break; | ||||
case S_RECONNECT_ATS: | ||||
case S_RECONNECT_BLACKLIST: | ||||
/* we didn't expect any CONNECT_ACK, as we are waiting for ATS | ||||
to give us a new address... */ | ||||
GNUNET_STATISTICS_update (GST_stats, | ||||
gettext_noop | ||||
("# unexpected CONNECT_ACK messages (waiting | ||||
on ATS)"), | ||||
1, GNUNET_NO); | ||||
break; | ||||
case S_RECONNECT_SENT: | ||||
/* new address worked; go back to connected! */ | ||||
n->state = S_CONNECTED; | ||||
send_session_ack_message (n); | ||||
break; | ||||
case S_CONNECTED_SWITCHING_BLACKLIST: | ||||
/* duplicate CONNECT_ACK, let's answer by duplciate SESSION_ACK just in | ||||
case */ | ||||
send_session_ack_message (n); | ||||
break; | ||||
case S_CONNECTED_SWITCHING_CONNECT_SENT: | ||||
/* new address worked; adopt it and go back to connected! */ | ||||
n->state = S_CONNECTED; | ||||
n->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CO | ||||
NNECTION_TIMEOUT); | ||||
GNUNET_break (GNUNET_NO == n->alternative_address.ats_active); | ||||
set_address (&n->primary_address, | ||||
n->alternative_address.address, | ||||
n->alternative_address.session, | ||||
n->alternative_address.bandwidth_in, | ||||
n->alternative_address.bandwidth_out, | ||||
GNUNET_YES); | ||||
free_address (&n->alternative_address); | ||||
send_session_ack_message (n); | ||||
break; | ||||
case S_DISCONNECT: | ||||
GNUNET_STATISTICS_update (GST_stats, | ||||
gettext_noop | ||||
("# unexpected CONNECT_ACK messages (disconne | ||||
cting)"), | ||||
1, GNUNET_NO); | ||||
break; | ||||
case S_DISCONNECT_FINISHED: | ||||
GNUNET_assert (0); | ||||
break; | ||||
default: | ||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unhandled state `%s' \n",print_st | ||||
ate (n->state)); | ||||
GNUNET_break (0); | ||||
break; | ||||
} | } | |||
return ret; | ||||
} | } | |||
/** | /** | |||
* Keep the connection to the given neighbour alive longer, | * A session was terminated. Take note; if needed, try to get | |||
* we received a KEEPALIVE (or equivalent). | * an alternative address from ATS. | |||
* | * | |||
* @param neighbour neighbour to keep alive | * @param peer identity of the peer where the session died | |||
* @param session session that is gone | ||||
*/ | */ | |||
void | void | |||
GST_neighbours_keepalive (const struct GNUNET_PeerIdentity *neighbour) | GST_neighbours_session_terminated (const struct GNUNET_PeerIdentity *peer, | |||
struct Session *session) | ||||
{ | { | |||
struct NeighbourMapEntry *n; | struct NeighbourMapEntry *n; | |||
struct BlackListCheckContext *bcc; | ||||
struct BlackListCheckContext *bcc_next; | ||||
// This can happen during shutdown | /* make sure to cancel all ongoing blacklist checks involving 'session' * | |||
if (neighbours == NULL) | / | |||
bcc_next = bc_head; | ||||
while (NULL != (bcc = bcc_next)) | ||||
{ | { | |||
return; | bcc_next = bcc->next; | |||
if (bcc->na.session == session) | ||||
{ | ||||
GST_blacklist_test_cancel (bcc->bc); | ||||
GNUNET_HELLO_address_free (bcc->na.address); | ||||
GNUNET_CONTAINER_DLL_remove (bc_head, | ||||
bc_tail, | ||||
bcc); | ||||
GNUNET_free (bcc); | ||||
} | ||||
} | } | |||
if (NULL == (n = lookup_neighbour (peer))) | ||||
return; /* can't affect us */ | ||||
if (session != n->primary_address.session) | ||||
{ | ||||
if (session == n->alternative_address.session) | ||||
{ | ||||
free_address (&n->alternative_address); | ||||
if ( (S_CONNECTED_SWITCHING_BLACKLIST == n->state) || | ||||
(S_CONNECTED_SWITCHING_CONNECT_SENT == n->state) ) | ||||
n->state = S_CONNECTED; | ||||
else | ||||
GNUNET_break (0); | ||||
} | ||||
return; /* doesn't affect us further */ | ||||
} | ||||
n->expect_latency_response = GNUNET_NO; | ||||
n = lookup_neighbour (neighbour); | switch (n->state) | |||
if (NULL == n) | ||||
{ | { | |||
GNUNET_STATISTICS_update (GST_stats, | case S_NOT_CONNECTED: | |||
gettext_noop | GNUNET_break (0); | |||
("# KEEPALIVE messages discarded (not connect | free_neighbour (n, GNUNET_NO); | |||
ed)"), | ||||
1, GNUNET_NO); | ||||
return; | return; | |||
} | case S_INIT_ATS: | |||
GNUNET_SCHEDULER_cancel (n->timeout_task); | GNUNET_break (0); | |||
n->timeout_task = | free_neighbour (n, GNUNET_NO); | |||
GNUNET_SCHEDULER_add_delayed (GNUNET_CONSTANTS_IDLE_CONNECTION_TIMEOU | ||||
T, | ||||
&neighbour_timeout_task, n); | ||||
/* send reply to measure latency */ | ||||
if (S_CONNECTED != n->state) | ||||
return; | return; | |||
case S_INIT_BLACKLIST: | ||||
struct GNUNET_MessageHeader m; | case S_CONNECT_SENT: | |||
free_address (&n->primary_address); | ||||
m.size = htons (sizeof (struct GNUNET_MessageHeader)); | n->state = S_INIT_ATS; | |||
m.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_KEEPALIVE_RESPONSE) | n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT); | |||
; | // FIXME: need to ask ATS for suggestions again? | |||
GNUNET_ATS_suggest_address (GST_ats, &n->id); | ||||
send_with_session(n, | break; | |||
(const void *) &m, sizeof (m), | case S_CONNECT_RECV_ATS: | |||
UINT32_MAX, | case S_CONNECT_RECV_BLACKLIST: | |||
GNUNET_TIME_UNIT_FOREVER_REL, | case S_CONNECT_RECV_ACK: | |||
NULL, NULL); | /* error on inbound session; free neighbour entirely */ | |||
free_address (&n->primary_address); | ||||
free_neighbour (n, GNUNET_NO); | ||||
return; | ||||
case S_CONNECTED: | ||||
free_address (&n->primary_address); | ||||
n->state = S_RECONNECT_ATS; | ||||
n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT); | ||||
/* FIXME: is this ATS call needed? */ | ||||
GNUNET_ATS_suggest_address (GST_ats, &n->id); | ||||
break; | ||||
case S_RECONNECT_ATS: | ||||
/* we don't have an address, how can it go down? */ | ||||
GNUNET_break (0); | ||||
break; | ||||
case S_RECONNECT_BLACKLIST: | ||||
case S_RECONNECT_SENT: | ||||
n->state = S_RECONNECT_ATS; | ||||
n->timeout = GNUNET_TIME_relative_to_absolute (ATS_RESPONSE_TIMEOUT); | ||||
// FIXME: need to ask ATS for suggestions again? | ||||
GNUNET_ATS_suggest_address (GST_ats, &n->id); | ||||
break; | ||||
case S_CONNECTED_SWITCHING_BLACKLIST: | ||||
/* primary went down while we were checking secondary against | ||||
blacklist, adopt secondary as primary */ | ||||
free_address (&n->primary_address); | ||||
n->primary_address = n->alternative_address; | ||||
memset (&n->alternative_address, 0, sizeof (struct NeighbourAddress)); | ||||
n->timeout = GNUNET_TIME_relative_to_absolute (FAST_RECONNECT_TIMEOUT); | ||||
n->state = S_RECONNECT_BLACKLIST; | ||||
break; | ||||
case S_CONNECTED_SWITCHING_CONNECT_SENT: | ||||
/* primary went down while we were waiting for CONNECT_ACK on secondary | ||||
; | ||||
secondary as primary */ | ||||
free_address (&n->primary_address); | ||||
n->primary_address = n->alternative_address; | ||||
memset (&n->alternative_address, 0, sizeof (struct NeighbourAddress)); | ||||
n->timeout = GNUNET_TIME_relative_to_absolute (FAST_RECONNECT_TIMEOUT); | ||||
n->state = S_RECONNECT_SENT; | ||||
break; | ||||
case S_DISCONNECT: | ||||
free_address (&n->primary_address); | ||||
break; | ||||
case S_DISCONNECT_FINISHED: | ||||
/* neighbour was freed and plugins told to terminate session */ | ||||
break; | ||||
default: | ||||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unhandled state `%s' \n",print_st | ||||
ate (n->state)); | ||||
GNUNET_break (0); | ||||
break; | ||||
} | ||||
if (GNUNET_SCHEDULER_NO_TASK != n->task) | ||||
GNUNET_SCHEDULER_cancel (n->task); | ||||
n->task = GNUNET_SCHEDULER_add_now (&master_task, n); | ||||
} | } | |||
/** | /** | |||
* We received a KEEP_ALIVE_RESPONSE message and use this to calculate late | * We received a 'SESSION_ACK' message from the other peer. | |||
ncy | * If we sent a 'CONNECT_ACK' last, this means we are now | |||
* to this peer | * connected. Otherwise, do nothing. | |||
* | * | |||
* @param neighbour neighbour to keep alive | * @param message possibly a 'struct SessionConnectMessage' (check format) | |||
* @param peer identity of the peer to switch the address for | ||||
* @param address address of the other peer, NULL if other peer | ||||
* connected to us | ||||
* @param session session to use (or NULL) | ||||
* @param ats performance data | * @param ats performance data | |||
* @param ats_count number of entries in ats | * @param ats_count number of entries in ats | |||
*/ | */ | |||
void | void | |||
GST_neighbours_keepalive_response (const struct GNUNET_PeerIdentity *neighb | GST_neighbours_handle_session_ack (const struct GNUNET_MessageHeader *messa | |||
our, | ge, | |||
const struct GNUNET_ATS_Information *ats | const struct GNUNET_PeerIdentity *peer, | |||
, | const struct GNUNET_HELLO_Address *addres | |||
uint32_t ats_count) | s, | |||
struct Session *session, | ||||
const struct GNUNET_ATS_Information *ats, | ||||
uint32_t ats_count) | ||||
{ | { | |||
struct NeighbourMapEntry *n; | struct NeighbourMapEntry *n; | |||
struct GNUNET_ATS_Information *ats_new; | ||||
uint32_t latency; | ||||
if (neighbours == NULL) | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | |||
"Received SESSION_ACK message from peer `%s'\n", | ||||
GNUNET_i2s (peer)); | ||||
if (ntohs (message->size) != sizeof (struct GNUNET_MessageHeader)) | ||||
{ | { | |||
// This can happen during shutdown | GNUNET_break_op (0); | |||
return; | return; | |||
} | } | |||
if (NULL == (n = lookup_neighbour (peer))) | ||||
n = lookup_neighbour (neighbour); | ||||
if ((NULL == n) || (n->state != S_CONNECTED)) | ||||
{ | ||||
GNUNET_STATISTICS_update (GST_stats, | ||||
gettext_noop | ||||
("# KEEPALIVE_RESPONSE messages discarded (no | ||||
t connected)"), | ||||
1, GNUNET_NO); | ||||
return; | return; | |||
} | /* check if we are in a plausible state for having sent | |||
if (n->expect_latency_response != GNUNET_YES) | a CONNECT_ACK. If not, return, otherwise break */ | |||
if ( ( (S_CONNECT_RECV_ACK != n->state) && | ||||
(S_CONNECT_SENT != n->state) ) || | ||||
(2 != n->send_connect_ack) ) | ||||
{ | { | |||
GNUNET_STATISTICS_update (GST_stats, | GNUNET_STATISTICS_update (GST_stats, | |||
gettext_noop | gettext_noop ("# unexpected SESSION ACK messa | |||
("# KEEPALIVE_RESPONSE messages discarded (no | ges"), 1, | |||
t expected)"), | GNUNET_NO); | |||
1, GNUNET_NO); | ||||
return; | return; | |||
} | } | |||
n->expect_latency_response = GNUNET_NO; | n->state = S_CONNECTED; | |||
n->timeout = GNUNET_TIME_relative_to_absolute (GNUNET_CONSTANTS_IDLE_CONN | ||||
GNUNET_assert (n->keep_alive_sent.abs_value != | ECTION_TIMEOUT); | |||
GNUNET_TIME_absolute_get_zero ().abs_value); | GNUNET_STATISTICS_set (GST_stats, | |||
n->latency = | gettext_noop ("# peers connected"), | |||
GNUNET_TIME_absolute_get_difference (n->keep_alive_sent, | ++neighbours_connected, | |||
GNUNET_TIME_absolute_get ()); | GNUNET_NO); | |||
#if DEBUG_TRANSPORT | connect_notify_cb (callback_cls, &n->id, ats, ats_count); | |||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Latency for peer `%s' is %llu ms\n" | set_address (&n->primary_address, | |||
, | n->primary_address.address, | |||
GNUNET_i2s (&n->id), n->latency.rel_value); | n->primary_address.session, | |||
#endif | n->primary_address.bandwidth_in, | |||
n->primary_address.bandwidth_out, | ||||
if (n->latency.rel_value == GNUNET_TIME_relative_get_forever ().rel_value | GNUNET_YES); | |||
) | } | |||
{ | ||||
GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats, ats_co | ||||
unt); | ||||
} | ||||
else | ||||
{ | ||||
ats_new = | ||||
GNUNET_malloc (sizeof (struct GNUNET_ATS_Information) * | ||||
(ats_count + 1)); | ||||
memcpy (ats_new, ats, sizeof (struct GNUNET_ATS_Information) * ats_coun | ||||
t); | ||||
/* add latency */ | ||||
ats_new[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY); | ||||
if (n->latency.rel_value > UINT32_MAX) | ||||
latency = UINT32_MAX; | ||||
else | ||||
latency = n->latency.rel_value; | ||||
ats_new[ats_count].value = htonl (latency); | ||||
GNUNET_ATS_address_update (GST_ats, n->address, n->session, ats_new, | /** | |||
ats_count + 1); | * Test if we're connected to the given peer. | |||
GNUNET_free (ats_new); | * | |||
} | * @param target peer to test | |||
* @return GNUNET_YES if we are connected, GNUNET_NO if not | ||||
*/ | ||||
int | ||||
GST_neighbours_test_connected (const struct GNUNET_PeerIdentity *target) | ||||
{ | ||||
return test_connected (lookup_neighbour (target)); | ||||
} | } | |||
/** | /** | |||
* Change the incoming quota for the given peer. | * Change the incoming quota for the given peer. | |||
* | * | |||
* @param neighbour identity of peer to change qutoa for | * @param neighbour identity of peer to change qutoa for | |||
* @param quota new quota | * @param quota new quota | |||
*/ | */ | |||
void | void | |||
GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighb our, | GST_neighbours_set_incoming_quota (const struct GNUNET_PeerIdentity *neighb our, | |||
struct GNUNET_BANDWIDTH_Value32NBO quota ) | struct GNUNET_BANDWIDTH_Value32NBO quota ) | |||
{ | { | |||
struct NeighbourMapEntry *n; | struct NeighbourMapEntry *n; | |||
// This can happen during shutdown | if (NULL == (n = lookup_neighbour (neighbour))) | |||
if (neighbours == NULL) | ||||
{ | ||||
return; | ||||
} | ||||
n = lookup_neighbour (neighbour); | ||||
if (n == NULL) | ||||
{ | { | |||
GNUNET_STATISTICS_update (GST_stats, | GNUNET_STATISTICS_update (GST_stats, | |||
gettext_noop | gettext_noop | |||
("# SET QUOTA messages ignored (no such peer) "), | ("# SET QUOTA messages ignored (no such peer) "), | |||
1, GNUNET_NO); | 1, GNUNET_NO); | |||
return; | return; | |||
} | } | |||
#if DEBUG_TRANSPORT | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | |||
"Setting inbound quota of %u Bps for peer `%s' to all clients \n", | "Setting inbound quota of %u Bps for peer `%s' to all clients \n", | |||
ntohl (quota.value__), GNUNET_i2s (&n->id)); | ntohl (quota.value__), GNUNET_i2s (&n->id)); | |||
#endif | ||||
GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota); | GNUNET_BANDWIDTH_tracker_update_quota (&n->in_tracker, quota); | |||
if (0 != ntohl (quota.value__)) | if (0 != ntohl (quota.value__)) | |||
return; | return; | |||
#if DEBUG_TRANSPORT | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s '\n", | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Disconnecting peer `%4s' due to `%s '\n", | |||
GNUNET_i2s (&n->id), "SET_QUOTA"); | GNUNET_i2s (&n->id), "SET_QUOTA"); | |||
#endif | if (GNUNET_YES == test_connected (n)) | |||
if (is_connected (n)) | ||||
GNUNET_STATISTICS_update (GST_stats, | GNUNET_STATISTICS_update (GST_stats, | |||
gettext_noop ("# disconnects due to quota of 0"), | gettext_noop ("# disconnects due to quota of 0"), | |||
1, GNUNET_NO); | 1, GNUNET_NO); | |||
disconnect_neighbour (n); | disconnect_neighbour (n); | |||
} | } | |||
/** | /** | |||
* Closure for the neighbours_iterate function. | ||||
*/ | ||||
struct IteratorContext | ||||
{ | ||||
/** | ||||
* Function to call on each connected neighbour. | ||||
*/ | ||||
GST_NeighbourIterator cb; | ||||
/** | ||||
* Closure for 'cb'. | ||||
*/ | ||||
void *cb_cls; | ||||
}; | ||||
/** | ||||
* Call the callback from the closure for each connected neighbour. | ||||
* | ||||
* @param cls the 'struct IteratorContext' | ||||
* @param key the hash of the public key of the neighbour | ||||
* @param value the 'struct NeighbourMapEntry' | ||||
* @return GNUNET_OK (continue to iterate) | ||||
*/ | ||||
static int | ||||
neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value) | ||||
{ | ||||
struct IteratorContext *ic = cls; | ||||
struct NeighbourMapEntry *n = value; | ||||
if (!is_connected (n)) | ||||
return GNUNET_OK; | ||||
ic->cb (ic->cb_cls, &n->id, NULL, 0, n->address); | ||||
return GNUNET_OK; | ||||
} | ||||
/** | ||||
* Iterate over all connected neighbours. | ||||
* | ||||
* @param cb function to call | ||||
* @param cb_cls closure for cb | ||||
*/ | ||||
void | ||||
GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls) | ||||
{ | ||||
struct IteratorContext ic; | ||||
// This can happen during shutdown | ||||
if (neighbours == NULL) | ||||
{ | ||||
return; | ||||
} | ||||
ic.cb = cb; | ||||
ic.cb_cls = cb_cls; | ||||
GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, & | ||||
ic); | ||||
} | ||||
/** | ||||
* If we have an active connection to the given target, it must be shutdown | ||||
. | ||||
* | ||||
* @param target peer to disconnect from | ||||
*/ | ||||
void | ||||
GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target) | ||||
{ | ||||
struct NeighbourMapEntry *n; | ||||
// This can happen during shutdown | ||||
if (neighbours == NULL) | ||||
{ | ||||
return; | ||||
} | ||||
n = lookup_neighbour (target); | ||||
if (NULL == n) | ||||
return; /* not active */ | ||||
disconnect_neighbour (n); | ||||
} | ||||
/** | ||||
* We received a disconnect message from the given peer, | * We received a disconnect message from the given peer, | |||
* validate and process. | * validate and process. | |||
* | * | |||
* @param peer sender of the message | * @param peer sender of the message | |||
* @param msg the disconnect message | * @param msg the disconnect message | |||
*/ | */ | |||
void | void | |||
GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity | GST_neighbours_handle_disconnect_message (const struct GNUNET_PeerIdentity | |||
*peer, | *peer, | |||
const struct GNUNET_MessageHeader | const struct GNUNET_MessageHeader | |||
*msg) | *msg) | |||
{ | { | |||
struct NeighbourMapEntry *n; | struct NeighbourMapEntry *n; | |||
const struct SessionDisconnectMessage *sdm; | const struct SessionDisconnectMessage *sdm; | |||
GNUNET_HashCode hc; | GNUNET_HashCode hc; | |||
#if DEBUG_TRANSPORT | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Received DISCONNECT message from peer `%s'\n", | ||||
GNUNET_i2s (peer)); | ||||
#endif | ||||
if (ntohs (msg->size) != sizeof (struct SessionDisconnectMessage)) | ||||
{ | ||||
// GNUNET_break_op (0); | ||||
GNUNET_STATISTICS_update (GST_stats, | ||||
gettext_noop | ||||
("# disconnect messages ignored (old format)" | ||||
), 1, | ||||
GNUNET_NO); | ||||
return; | ||||
} | ||||
sdm = (const struct SessionDisconnectMessage *) msg; | ||||
n = lookup_neighbour (peer); | ||||
if (NULL == n) | ||||
return; /* gone already */ | ||||
if (GNUNET_TIME_absolute_ntoh (sdm->timestamp).abs_value <= | ||||
n->connect_ts.abs_value) | ||||
{ | ||||
GNUNET_STATISTICS_update (GST_stats, | ||||
gettext_noop | ||||
("# disconnect messages ignored (timestamp)") | ||||
, 1, | ||||
GNUNET_NO); | ||||
return; | ||||
} | ||||
GNUNET_CRYPTO_hash (&sdm->public_key, | ||||
sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncode | ||||
d), | ||||
&hc); | ||||
if (0 != memcmp (peer, &hc, sizeof (struct GNUNET_PeerIdentity))) | ||||
{ | ||||
GNUNET_break_op (0); | ||||
return; | ||||
} | ||||
if (ntohl (sdm->purpose.size) != | ||||
sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) + | ||||
sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) + | ||||
sizeof (struct GNUNET_TIME_AbsoluteNBO)) | ||||
{ | ||||
GNUNET_break_op (0); | ||||
return; | ||||
} | ||||
if (GNUNET_OK != | ||||
GNUNET_CRYPTO_rsa_verify | ||||
(GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT, &sdm->purpose, | ||||
&sdm->signature, &sdm->public_key)) | ||||
{ | ||||
GNUNET_break_op (0); | ||||
return; | ||||
} | ||||
GST_neighbours_force_disconnect (peer); | ||||
} | ||||
/** | ||||
* We received a 'SESSION_CONNECT_ACK' message from the other peer. | ||||
* Consider switching to it. | ||||
* | ||||
* @param message possibly a 'struct SessionConnectMessage' (check format) | ||||
* @param peer identity of the peer to switch the address for | ||||
* @param address address of the other peer, NULL if other peer | ||||
* connected to us | ||||
* @param session session to use (or NULL) | ||||
* @param ats performance data | ||||
* @param ats_count number of entries in ats | ||||
*/ | ||||
void | ||||
GST_neighbours_handle_connect_ack (const struct GNUNET_MessageHeader *messa | ||||
ge, | ||||
const struct GNUNET_PeerIdentity *peer, | ||||
const struct GNUNET_HELLO_Address *addre | ||||
ss, | ||||
struct Session *session, | ||||
const struct GNUNET_ATS_Information *ats | ||||
, | ||||
uint32_t ats_count) | ||||
{ | ||||
const struct SessionConnectMessage *scm; | ||||
struct GNUNET_MessageHeader msg; | ||||
struct NeighbourMapEntry *n; | ||||
size_t msg_len; | ||||
size_t ret; | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Received CONNECT_ACK message from peer `%s'\n", | ||||
GNUNET_i2s (peer)); | ||||
if (ntohs (message->size) != sizeof (struct SessionConnectMessage)) | ||||
{ | ||||
GNUNET_break_op (0); | ||||
return; | ||||
} | ||||
scm = (const struct SessionConnectMessage *) message; | ||||
GNUNET_break_op (ntohl (scm->reserved) == 0); | ||||
n = lookup_neighbour (peer); | ||||
if (NULL == n) | ||||
{ | ||||
/* we did not send 'CONNECT' -- at least not recently */ | ||||
GNUNET_STATISTICS_update (GST_stats, | ||||
gettext_noop | ||||
("# unexpected CONNECT_ACK messages (no peer) | ||||
"), | ||||
1, GNUNET_NO); | ||||
return; | ||||
} | ||||
/* Additional check | ||||
* | ||||
* ((n->state != S_CONNECT_RECV) && (n->address != NULL)): | ||||
* | ||||
* We also received an CONNECT message, switched from SENDT to RECV and | ||||
* ATS already suggested us an address after a successful blacklist check | ||||
*/ | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | |||
"Received CONNECT_ACK message from peer `%s' in state `%s'\n" | "Received DISCONNECT message from peer `%s'\n", | |||
, | GNUNET_i2s (peer)); | |||
GNUNET_i2s (peer), | if (ntohs (msg->size) != sizeof (struct SessionDisconnectMessage)) | |||
print_state(n->state)); | ||||
if ((n->address != NULL) && (n->state == S_CONNECTED)) | ||||
{ | { | |||
/* After fast reconnect: send ACK (ACK) even when we are connected */ | // GNUNET_break_op (0); | |||
msg_len = sizeof (msg); | GNUNET_STATISTICS_update (GST_stats, | |||
msg.size = htons (msg_len); | gettext_noop | |||
msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK); | ("# disconnect messages ignored (old format)" | |||
), 1, | ||||
ret = send_with_session(n, | GNUNET_NO); | |||
(const char *) &msg, msg_len, | ||||
UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, | ||||
NULL, NULL); | ||||
if (ret == GNUNET_SYSERR) | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Failed to send SESSION_ACK to `%4s' using address '%s' s | ||||
ession %X\n", | ||||
GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->ses | ||||
sion); | ||||
return; | return; | |||
} | } | |||
sdm = (const struct SessionDisconnectMessage *) msg; | ||||
if ((n->state != S_CONNECT_SENT) && | if (NULL == (n = lookup_neighbour (peer))) | |||
((n->state != S_CONNECT_RECV) && (n->address != NULL))) | return; /* gone already */ | |||
if (GNUNET_TIME_absolute_ntoh (sdm->timestamp).abs_value <= n->connect_ac | ||||
k_timestamp.abs_value) | ||||
{ | { | |||
GNUNET_STATISTICS_update (GST_stats, | GNUNET_STATISTICS_update (GST_stats, | |||
gettext_noop | gettext_noop | |||
("# unexpected CONNECT_ACK messages"), 1, | ("# disconnect messages ignored (timestamp)") , 1, | |||
GNUNET_NO); | GNUNET_NO); | |||
return; | return; | |||
} | } | |||
if (n->state != S_CONNECTED) | GNUNET_CRYPTO_hash (&sdm->public_key, | |||
change_state (n, S_CONNECTED); | sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncode | |||
d), | ||||
if (NULL != session) | &hc); | |||
{ | if (0 != memcmp (peer, &hc, sizeof (struct GNUNET_PeerIdentity))) | |||
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, | ||||
"transport-ats", | ||||
"Giving ATS session %p of plugin %s for peer %s\n", | ||||
session, address->transport_name, GNUNET_i2s (peer)); | ||||
} | ||||
GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count); | ||||
GNUNET_assert (NULL != n->address); | ||||
if ((n->address_state == FRESH) && (0 == GNUNET_HELLO_address_cmp(address | ||||
, n->address))) | ||||
{ | ||||
GST_validation_set_address_use (n->address, n->session, GNUNET_YES); | ||||
GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES) | ||||
; | ||||
n->address_state = USED; | ||||
} | ||||
GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in); | ||||
/* send ACK (ACK) */ | ||||
msg_len = sizeof (msg); | ||||
msg.size = htons (msg_len); | ||||
msg.type = htons (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_ACK); | ||||
ret = send_with_session(n, | ||||
(const char *) &msg, msg_len, | ||||
UINT32_MAX, GNUNET_TIME_UNIT_FOREVER_REL, | ||||
NULL, NULL); | ||||
if (ret == GNUNET_SYSERR) | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Failed to send SESSION_ACK to `%4s' using address '%s' ses | ||||
sion %X\n", | ||||
GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->sessi | ||||
on); | ||||
if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK) | ||||
n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task | ||||
, n); | ||||
neighbours_connected++; | ||||
GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), | ||||
1, | ||||
GNUNET_NO); | ||||
#if DEBUG_TRANSPORT | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Notify about connect of `%4s' using address '%s' session %X | ||||
LINE %u\n", | ||||
GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session | ||||
, | ||||
__LINE__); | ||||
#endif | ||||
connect_notify_cb (callback_cls, &n->id, ats, ats_count); | ||||
send_outbound_quota (peer, n->bandwidth_out); | ||||
} | ||||
void | ||||
GST_neighbours_handle_ack (const struct GNUNET_MessageHeader *message, | ||||
const struct GNUNET_PeerIdentity *peer, | ||||
const struct GNUNET_HELLO_Address *address, | ||||
struct Session *session, | ||||
const struct GNUNET_ATS_Information *ats, | ||||
uint32_t ats_count) | ||||
{ | ||||
struct NeighbourMapEntry *n; | ||||
#if DEBUG_TRANSPORT | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Received ACK message from peer `%s' | ||||
\n", | ||||
GNUNET_i2s (peer)); | ||||
#endif | ||||
if (ntohs (message->size) != sizeof (struct GNUNET_MessageHeader)) | ||||
{ | { | |||
GNUNET_break_op (0); | GNUNET_break_op (0); | |||
return; | return; | |||
} | } | |||
n = lookup_neighbour (peer); | if (ntohl (sdm->purpose.size) != | |||
if (NULL == n) | sizeof (struct GNUNET_CRYPTO_RsaSignaturePurpose) + | |||
sizeof (struct GNUNET_CRYPTO_RsaPublicKeyBinaryEncoded) + | ||||
sizeof (struct GNUNET_TIME_AbsoluteNBO)) | ||||
{ | { | |||
GNUNET_break (0); | GNUNET_break_op (0); | |||
return; | return; | |||
} | } | |||
if (S_CONNECTED == n->state) | if (GNUNET_OK != | |||
return; | GNUNET_CRYPTO_rsa_verify | |||
if (!is_connecting (n)) | (GNUNET_MESSAGE_TYPE_TRANSPORT_SESSION_DISCONNECT, &sdm->purpose, | |||
&sdm->signature, &sdm->public_key)) | ||||
{ | { | |||
GNUNET_STATISTICS_update (GST_stats, | GNUNET_break_op (0); | |||
gettext_noop ("# unexpected ACK messages"), 1 | ||||
, | ||||
GNUNET_NO); | ||||
return; | return; | |||
} | } | |||
change_state (n, S_CONNECTED); | if (GNUNET_YES == test_connected (n)) | |||
if (NULL != session) | GNUNET_STATISTICS_update (GST_stats, | |||
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, | gettext_noop | |||
"transport-ats", | ("# other peer asked to disconnect from us"), | |||
"Giving ATS session %p of plugin %s for peer %s\n", | 1, | |||
session, address->transport_name, GNUNET_i2s (peer)); | GNUNET_NO); | |||
GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count); | disconnect_neighbour (n); | |||
GNUNET_assert (n->address != NULL); | ||||
if ((n->address_state == FRESH) && (0 == GNUNET_HELLO_address_cmp(address | ||||
, n->address))) | ||||
{ | ||||
GST_validation_set_address_use (n->address, n->session, GNUNET_YES); | ||||
GNUNET_ATS_address_in_use (GST_ats, n->address, n->session, GNUNET_YES) | ||||
; | ||||
n->address_state = USED; | ||||
} | ||||
neighbours_connected++; | ||||
GNUNET_STATISTICS_update (GST_stats, gettext_noop ("# peers connected"), | ||||
1, | ||||
GNUNET_NO); | ||||
GST_neighbours_set_incoming_quota (&n->id, n->bandwidth_in); | ||||
if (n->keepalive_task == GNUNET_SCHEDULER_NO_TASK) | ||||
n->keepalive_task = GNUNET_SCHEDULER_add_now (&neighbour_keepalive_task | ||||
, n); | ||||
#if DEBUG_TRANSPORT | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | ||||
"Notify about connect of `%4s' using address '%s' session %X | ||||
LINE %u\n", | ||||
GNUNET_i2s (&n->id), GST_plugins_a2s (n->address), n->session | ||||
, | ||||
__LINE__); | ||||
#endif | ||||
connect_notify_cb (callback_cls, &n->id, ats, ats_count); | ||||
send_outbound_quota (peer, n->bandwidth_out); | ||||
} | } | |||
struct BlackListCheckContext | /** | |||
* Closure for the neighbours_iterate function. | ||||
*/ | ||||
struct IteratorContext | ||||
{ | { | |||
struct GNUNET_ATS_Information *ats; | /** | |||
* Function to call on each connected neighbour. | ||||
*/ | ||||
GST_NeighbourIterator cb; | ||||
uint32_t ats_count; | /** | |||
* Closure for 'cb'. | ||||
*/ | ||||
void *cb_cls; | ||||
}; | ||||
struct Session *session; | /** | |||
* Call the callback from the closure for each connected neighbour. | ||||
* | ||||
* @param cls the 'struct IteratorContext' | ||||
* @param key the hash of the public key of the neighbour | ||||
* @param value the 'struct NeighbourMapEntry' | ||||
* @return GNUNET_OK (continue to iterate) | ||||
*/ | ||||
static int | ||||
neighbours_iterate (void *cls, const GNUNET_HashCode * key, void *value) | ||||
{ | ||||
struct IteratorContext *ic = cls; | ||||
struct NeighbourMapEntry *n = value; | ||||
struct GNUNET_HELLO_Address *address; | if (GNUNET_YES == test_connected (n)) | |||
ic->cb (ic->cb_cls, &n->id, NULL, 0, n->primary_address.address); | ||||
return GNUNET_OK; | ||||
} | ||||
struct GNUNET_TIME_Absolute ts; | /** | |||
}; | * Iterate over all connected neighbours. | |||
* | ||||
* @param cb function to call | ||||
* @param cb_cls closure for cb | ||||
*/ | ||||
void | ||||
GST_neighbours_iterate (GST_NeighbourIterator cb, void *cb_cls) | ||||
{ | ||||
struct IteratorContext ic; | ||||
static void | if (NULL == neighbours) | |||
handle_connect_blacklist_cont (void *cls, | return; /* can happen during shutdown */ | |||
const struct GNUNET_PeerIdentity *peer, | ic.cb = cb; | |||
int result) | ic.cb_cls = cb_cls; | |||
GNUNET_CONTAINER_multihashmap_iterate (neighbours, &neighbours_iterate, & | ||||
ic); | ||||
} | ||||
/** | ||||
* If we have an active connection to the given target, it must be shutdown | ||||
. | ||||
* | ||||
* @param target peer to disconnect from | ||||
*/ | ||||
void | ||||
GST_neighbours_force_disconnect (const struct GNUNET_PeerIdentity *target) | ||||
{ | { | |||
struct NeighbourMapEntry *n; | struct NeighbourMapEntry *n; | |||
struct BlackListCheckContext *bcc = cls; | ||||
#if DEBUG_TRANSPORT | if (NULL == (n = lookup_neighbour (target))) | |||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | return; /* not active */ | |||
"Blacklist check due to CONNECT message: `%s'\n", | if (GNUNET_YES == test_connected (n)) | |||
GNUNET_i2s (peer), | GNUNET_STATISTICS_update (GST_stats, | |||
(result == GNUNET_OK) ? "ALLOWED" : "FORBIDDEN"); | gettext_noop | |||
#endif | ("# disconnected from peer upon explicit reque | |||
st"), 1, | ||||
GNUNET_NO); | ||||
disconnect_neighbour (n); | ||||
} | ||||
/* not allowed */ | /** | |||
if (GNUNET_OK != result) | * Obtain current latency information for the given neighbour. | |||
{ | * | |||
GNUNET_HELLO_address_free (bcc->address); | * @param peer to get the latency for | |||
GNUNET_free (bcc); | * @return observed latency of the address, FOREVER if the | |||
return; | * the connection is not up | |||
} | */ | |||
struct GNUNET_TIME_Relative | ||||
GST_neighbour_get_latency (const struct GNUNET_PeerIdentity *peer) | ||||
{ | ||||
struct NeighbourMapEntry *n; | ||||
n = lookup_neighbour (peer); | n = lookup_neighbour (peer); | |||
if (NULL == n) | if (NULL == n) | |||
n = setup_neighbour (peer); | return GNUNET_TIME_UNIT_FOREVER_REL; | |||
switch (n->state) | ||||
if (bcc->ts.abs_value > n->connect_ts.abs_value) | ||||
{ | { | |||
if (NULL != bcc->session) | case S_CONNECTED: | |||
GNUNET_log_from (GNUNET_ERROR_TYPE_DEBUG | GNUNET_ERROR_TYPE_BULK, | case S_RECONNECT_SENT: | |||
"transport-ats", | case S_RECONNECT_ATS: | |||
"Giving ATS session %p of address `%s' for peer %s\n | return n->latency; | |||
", | case S_NOT_CONNECTED: | |||
bcc->session, GST_plugins_a2s (bcc->address), | case S_INIT_BLACKLIST: | |||
GNUNET_i2s (peer)); | case S_INIT_ATS: | |||
/* Tell ATS about the session, so ATS can suggest it if it likes it. */ | case S_CONNECT_SENT: | |||
case S_CONNECT_RECV_BLACKLIST: | ||||
GNUNET_ATS_address_update (GST_ats, bcc->address, bcc->session, bcc->at | case S_DISCONNECT: | |||
s, | case S_DISCONNECT_FINISHED: | |||
bcc->ats_count); | return GNUNET_TIME_UNIT_FOREVER_REL; | |||
n->connect_ts = bcc->ts; | default: | |||
GNUNET_log (GNUNET_ERROR_TYPE_ERROR, "Unhandled state `%s' \n",print_st | ||||
ate (n->state)); | ||||
GNUNET_break (0); | ||||
break; | ||||
} | } | |||
return GNUNET_TIME_UNIT_FOREVER_REL; | ||||
GNUNET_HELLO_address_free (bcc->address); | ||||
GNUNET_free (bcc); | ||||
if (n->state != S_CONNECT_RECV) | ||||
change_state (n, S_CONNECT_RECV); | ||||
/* Ask ATS for an address to connect via that address */ | ||||
if (n->ats_suggest != GNUNET_SCHEDULER_NO_TASK) | ||||
GNUNET_SCHEDULER_cancel (n->ats_suggest); | ||||
n->ats_suggest = | ||||
GNUNET_SCHEDULER_add_delayed (ATS_RESPONSE_TIMEOUT, ats_suggest_cance | ||||
l, | ||||
n); | ||||
GNUNET_ATS_suggest_address (GST_ats, peer); | ||||
} | } | |||
/** | /** | |||
* We received a 'SESSION_CONNECT' message from the other peer. | * Obtain current address information for the given neighbour. | |||
* Consider switching to it. | ||||
* | * | |||
* @param message possibly a 'struct SessionConnectMessage' (check format) | * @param peer | |||
* @param peer identity of the peer to switch the address for | * @return address currently used | |||
* @param address address of the other peer, NULL if other peer | ||||
* connected to us | ||||
* @param session session to use (or NULL) | ||||
* @param ats performance data | ||||
* @param ats_count number of entries in ats (excluding 0-termination) | ||||
*/ | */ | |||
void | struct GNUNET_HELLO_Address * | |||
GST_neighbours_handle_connect (const struct GNUNET_MessageHeader *message, | GST_neighbour_get_current_address (const struct GNUNET_PeerIdentity *peer) | |||
const struct GNUNET_PeerIdentity *peer, | ||||
const struct GNUNET_HELLO_Address *address, | ||||
struct Session *session, | ||||
const struct GNUNET_ATS_Information *ats, | ||||
uint32_t ats_count) | ||||
{ | { | |||
const struct SessionConnectMessage *scm; | ||||
struct BlackListCheckContext *bcc = NULL; | ||||
struct NeighbourMapEntry *n; | struct NeighbourMapEntry *n; | |||
#if DEBUG_TRANSPORT | n = lookup_neighbour (peer); | |||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | if (NULL == n) | |||
"Received CONNECT message from peer `%s'\n", GNUNET_i2s (peer | return NULL; | |||
)); | return n->primary_address.address; | |||
#endif | } | |||
if (ntohs (message->size) != sizeof (struct SessionConnectMessage)) | /** | |||
{ | * Initialize the neighbours subsystem. | |||
GNUNET_break_op (0); | * | |||
return; | * @param cls closure for callbacks | |||
} | * @param connect_cb function to call if we connect to a peer | |||
* @param disconnect_cb function to call if we disconnect from a peer | ||||
* @param peer_address_cb function to call if we change an active address | ||||
* of a neighbour | ||||
*/ | ||||
void | ||||
GST_neighbours_start (void *cls, | ||||
GNUNET_TRANSPORT_NotifyConnect connect_cb, | ||||
GNUNET_TRANSPORT_NotifyDisconnect disconnect_cb, | ||||
GNUNET_TRANSPORT_PeerIterateCallback peer_address_cb) | ||||
{ | ||||
callback_cls = cls; | ||||
connect_notify_cb = connect_cb; | ||||
disconnect_notify_cb = disconnect_cb; | ||||
address_change_cb = peer_address_cb; | ||||
neighbours = GNUNET_CONTAINER_multihashmap_create (NEIGHBOUR_TABLE_SIZE); | ||||
} | ||||
scm = (const struct SessionConnectMessage *) message; | /** | |||
GNUNET_break_op (ntohl (scm->reserved) == 0); | * Disconnect from the given neighbour. | |||
* | ||||
* @param cls unused | ||||
* @param key hash of neighbour's public key (not used) | ||||
* @param value the 'struct NeighbourMapEntry' of the neighbour | ||||
* @return GNUNET_OK (continue to iterate) | ||||
*/ | ||||
static int | ||||
disconnect_all_neighbours (void *cls, const GNUNET_HashCode * key, void *va | ||||
lue) | ||||
{ | ||||
struct NeighbourMapEntry *n = value; | ||||
GNUNET_ATS_address_update (GST_ats, address, session, ats, ats_count); | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | |||
"Disconnecting peer `%4s', %s\n", | ||||
GNUNET_i2s (&n->id), "SHUTDOWN_TASK"); | ||||
n->state = S_DISCONNECT_FINISHED; | ||||
free_neighbour (n, GNUNET_NO); | ||||
return GNUNET_OK; | ||||
} | ||||
n = lookup_neighbour (peer); | /** | |||
if ((n != NULL) && ((S_CONNECTED == n->state) || (S_FAST_RECONNECT == n-> | * Cleanup the neighbours subsystem. | |||
state))) | */ | |||
{ | void | |||
/* connected peer switches addresses or is trying to do a fast reconnec | GST_neighbours_stop () | |||
t*/ | { | |||
if (NULL == neighbours) | ||||
return; | return; | |||
} | GNUNET_CONTAINER_multihashmap_iterate (neighbours, | |||
&disconnect_all_neighbours, | ||||
/* we are not connected to this peer */ | NULL); | |||
/* do blacklist check */ | GNUNET_CONTAINER_multihashmap_destroy (neighbours); | |||
bcc = | neighbours = NULL; | |||
GNUNET_malloc (sizeof (struct BlackListCheckContext) + | callback_cls = NULL; | |||
sizeof (struct GNUNET_ATS_Information) * (ats_count + | connect_notify_cb = NULL; | |||
1)); | disconnect_notify_cb = NULL; | |||
bcc->ts = GNUNET_TIME_absolute_ntoh (scm->timestamp); | address_change_cb = NULL; | |||
bcc->ats_count = ats_count + 1; | ||||
bcc->address = GNUNET_HELLO_address_copy (address); | ||||
bcc->session = session; | ||||
bcc->ats = (struct GNUNET_ATS_Information *) &bcc[1]; | ||||
memcpy (bcc->ats, ats, sizeof (struct GNUNET_ATS_Information) * ats_count | ||||
); | ||||
bcc->ats[ats_count].type = htonl (GNUNET_ATS_QUALITY_NET_DELAY); | ||||
bcc->ats[ats_count].value = | ||||
htonl ((uint32_t) GST_neighbour_get_latency (peer).rel_value); | ||||
GST_blacklist_test_allowed (peer, address->transport_name, | ||||
&handle_connect_blacklist_cont, bcc); | ||||
} | } | |||
/* end of file gnunet-service-transport_neighbours.c */ | /* end of file gnunet-service-transport_neighbours.c */ | |||
End of changes. 329 change blocks. | ||||
2011 lines changed or deleted | 2429 lines changed or added | |||
This html diff was produced by rfcdiff 1.41. The latest version is available from http://tools.ietf.org/tools/rfcdiff/ |