gnunet-service-dht_clients.c | gnunet-service-dht_clients.c | |||
---|---|---|---|---|
skipping to change at line 88 | skipping to change at line 88 | |||
/** | /** | |||
* The handle to this client | * The handle to this client | |||
*/ | */ | |||
struct GNUNET_SERVER_Client *client_handle; | struct GNUNET_SERVER_Client *client_handle; | |||
/** | /** | |||
* Handle to the current transmission request, NULL | * Handle to the current transmission request, NULL | |||
* if none pending. | * if none pending. | |||
*/ | */ | |||
struct GNUNET_CONNECTION_TransmitHandle *transmit_handle; | struct GNUNET_SERVER_TransmitHandle *transmit_handle; | |||
/** | /** | |||
* Linked list of pending messages for this client | * Linked list of pending messages for this client | |||
*/ | */ | |||
struct PendingMessage *pending_head; | struct PendingMessage *pending_head; | |||
/** | /** | |||
* Tail of linked list of pending messages for this client | * Tail of linked list of pending messages for this client | |||
*/ | */ | |||
struct PendingMessage *pending_tail; | struct PendingMessage *pending_tail; | |||
skipping to change at line 203 | skipping to change at line 203 | |||
* Type of blocks that are of interest | * Type of blocks that are of interest | |||
*/ | */ | |||
enum GNUNET_BLOCK_Type type; | enum GNUNET_BLOCK_Type type; | |||
/** | /** | |||
* Key of data of interest, NULL for all. | * Key of data of interest, NULL for all. | |||
*/ | */ | |||
GNUNET_HashCode *key; | GNUNET_HashCode *key; | |||
/** | /** | |||
* Flag whether to notify about GET messages. | ||||
*/ | ||||
int16_t get; | ||||
/** | ||||
* Flag whether to notify about GET_REPONSE messages. | ||||
*/ | ||||
int16_t get_resp; | ||||
/** | ||||
* Flag whether to notify about PUT messages. | ||||
*/ | ||||
uint16_t put; | ||||
/** | ||||
* Client to notify of these requests. | * Client to notify of these requests. | |||
*/ | */ | |||
struct ClientList *client; | struct ClientList *client; | |||
}; | }; | |||
/** | /** | |||
* List of active clients. | * List of active clients. | |||
*/ | */ | |||
static struct ClientList *client_head; | static struct ClientList *client_head; | |||
skipping to change at line 244 | skipping to change at line 259 | |||
* Heap with all of our client's request, sorted by retry time (earliest on top). | * Heap with all of our client's request, sorted by retry time (earliest on top). | |||
*/ | */ | |||
static struct GNUNET_CONTAINER_Heap *retry_heap; | static struct GNUNET_CONTAINER_Heap *retry_heap; | |||
/** | /** | |||
* Task that re-transmits requests (using retry_heap). | * Task that re-transmits requests (using retry_heap). | |||
*/ | */ | |||
static GNUNET_SCHEDULER_TaskIdentifier retry_task; | static GNUNET_SCHEDULER_TaskIdentifier retry_task; | |||
/** | /** | |||
* Task run to check for messages that need to be sent to a client. | ||||
* | ||||
* @param client a ClientList, containing the client and any messages to be | ||||
sent to it | ||||
*/ | ||||
static void | ||||
process_pending_messages (struct ClientList *client); | ||||
/** | ||||
* Add a PendingMessage to the clients list of messages to be sent | ||||
* | ||||
* @param client the active client to send the message to | ||||
* @param pending_message the actual message to send | ||||
*/ | ||||
static void | ||||
add_pending_message (struct ClientList *client, | ||||
struct PendingMessage *pending_message) | ||||
{ | ||||
GNUNET_CONTAINER_DLL_insert_tail (client->pending_head, client->pending_t | ||||
ail, | ||||
pending_message); | ||||
process_pending_messages (client); | ||||
} | ||||
/** | ||||
* Find a client if it exists, add it otherwise. | * Find a client if it exists, add it otherwise. | |||
* | * | |||
* @param client the server handle to the client | * @param client the server handle to the client | |||
* | * | |||
* @return the client if found, a new client otherwise | * @return the client if found, a new client otherwise | |||
*/ | */ | |||
static struct ClientList * | static struct ClientList * | |||
find_active_client (struct GNUNET_SERVER_Client *client) | find_active_client (struct GNUNET_SERVER_Client *client) | |||
{ | { | |||
struct ClientList *pos = client_head; | struct ClientList *pos = client_head; | |||
skipping to change at line 285 | skipping to change at line 323 | |||
* @return GNUNET_YES (we should continue to iterate) | * @return GNUNET_YES (we should continue to iterate) | |||
*/ | */ | |||
static int | static int | |||
remove_client_records (void *cls, const GNUNET_HashCode * key, void *value) | remove_client_records (void *cls, const GNUNET_HashCode * key, void *value) | |||
{ | { | |||
struct ClientList *client = cls; | struct ClientList *client = cls; | |||
struct ClientQueryRecord *record = value; | struct ClientQueryRecord *record = value; | |||
if (record->client != client) | if (record->client != client) | |||
return GNUNET_YES; | return GNUNET_YES; | |||
#if DEBUG_DHT | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | |||
"Removing client %p's record for key %s\n", client, | "Removing client %p's record for key %s\n", client, | |||
GNUNET_h2s (key)); | GNUNET_h2s (key)); | |||
#endif | ||||
GNUNET_assert (GNUNET_YES == | GNUNET_assert (GNUNET_YES == | |||
GNUNET_CONTAINER_multihashmap_remove (forward_map, key, | GNUNET_CONTAINER_multihashmap_remove (forward_map, key, | |||
record)); | record)); | |||
if (NULL != record->hnode) | if (NULL != record->hnode) | |||
GNUNET_CONTAINER_heap_remove_node (record->hnode); | GNUNET_CONTAINER_heap_remove_node (record->hnode); | |||
GNUNET_array_grow (record->seen_replies, record->seen_replies_count, 0); | GNUNET_array_grow (record->seen_replies, record->seen_replies_count, 0); | |||
GNUNET_free (record); | GNUNET_free (record); | |||
return GNUNET_YES; | return GNUNET_YES; | |||
} | } | |||
skipping to change at line 315 | skipping to change at line 351 | |||
* @param client identification of the client; NULL | * @param client identification of the client; NULL | |||
* for the last call when the server is destroyed | * for the last call when the server is destroyed | |||
*/ | */ | |||
static void | static void | |||
handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) | handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) | |||
{ | { | |||
struct ClientList *pos; | struct ClientList *pos; | |||
struct PendingMessage *reply; | struct PendingMessage *reply; | |||
struct ClientMonitorRecord *monitor; | struct ClientMonitorRecord *monitor; | |||
#if DEBUG_DHT | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Local client %p disconnects\n", cli ent); | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Local client %p disconnects\n", cli ent); | |||
#endif | ||||
pos = find_active_client (client); | pos = find_active_client (client); | |||
GNUNET_CONTAINER_DLL_remove (client_head, client_tail, pos); | GNUNET_CONTAINER_DLL_remove (client_head, client_tail, pos); | |||
if (pos->transmit_handle != NULL) | if (pos->transmit_handle != NULL) | |||
GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->transmit_handle); | GNUNET_SERVER_notify_transmit_ready_cancel (pos->transmit_handle); | |||
while (NULL != (reply = pos->pending_head)) | while (NULL != (reply = pos->pending_head)) | |||
{ | { | |||
GNUNET_CONTAINER_DLL_remove (pos->pending_head, pos->pending_tail, repl y); | GNUNET_CONTAINER_DLL_remove (pos->pending_head, pos->pending_tail, repl y); | |||
GNUNET_free (reply); | GNUNET_free (reply); | |||
} | } | |||
monitor = monitor_head; | monitor = monitor_head; | |||
while (NULL != monitor) | while (NULL != monitor) | |||
{ | { | |||
if (monitor->client == pos) | if (monitor->client == pos) | |||
{ | { | |||
skipping to change at line 441 | skipping to change at line 475 | |||
* @param client the client we received this message from | * @param client the client we received this message from | |||
* @param message the actual message received | * @param message the actual message received | |||
*/ | */ | |||
static void | static void | |||
handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client, | handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client, | |||
const struct GNUNET_MessageHeader *message) | const struct GNUNET_MessageHeader *message) | |||
{ | { | |||
const struct GNUNET_DHT_ClientPutMessage *dht_msg; | const struct GNUNET_DHT_ClientPutMessage *dht_msg; | |||
struct GNUNET_CONTAINER_BloomFilter *peer_bf; | struct GNUNET_CONTAINER_BloomFilter *peer_bf; | |||
uint16_t size; | uint16_t size; | |||
struct PendingMessage *pm; | ||||
struct GNUNET_DHT_ClientPutConfirmationMessage *conf; | ||||
size = ntohs (message->size); | size = ntohs (message->size); | |||
if (size < sizeof (struct GNUNET_DHT_ClientPutMessage)) | if (size < sizeof (struct GNUNET_DHT_ClientPutMessage)) | |||
{ | { | |||
GNUNET_break (0); | GNUNET_break (0); | |||
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); | |||
return; | return; | |||
} | } | |||
GNUNET_STATISTICS_update (GDS_stats, | GNUNET_STATISTICS_update (GDS_stats, | |||
gettext_noop | gettext_noop | |||
("# PUT requests received from clients"), 1, | ("# PUT requests received from clients"), 1, | |||
GNUNET_NO); | GNUNET_NO); | |||
dht_msg = (const struct GNUNET_DHT_ClientPutMessage *) message; | dht_msg = (const struct GNUNET_DHT_ClientPutMessage *) message; | |||
/* give to local clients */ | /* give to local clients */ | |||
#if DEBUG_DHT | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | |||
"Handling local PUT of %u-bytes for query %s\n", | "Handling local PUT of %u-bytes for query %s\n", | |||
size - sizeof (struct GNUNET_DHT_ClientPutMessage), | size - sizeof (struct GNUNET_DHT_ClientPutMessage), | |||
GNUNET_h2s (&dht_msg->key)); | GNUNET_h2s (&dht_msg->key)); | |||
#endif | ||||
GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration) , | GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration) , | |||
&dht_msg->key, 0, NULL, 0, NULL, | &dht_msg->key, 0, NULL, 0, NULL, | |||
ntohl (dht_msg->type), | ntohl (dht_msg->type), | |||
size - sizeof (struct GNUNET_DHT_ClientPutMessa ge), | size - sizeof (struct GNUNET_DHT_ClientPutMessa ge), | |||
&dht_msg[1]); | &dht_msg[1]); | |||
/* store locally */ | /* store locally */ | |||
GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (dht_msg->expiration) , | GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (dht_msg->expiration) , | |||
&dht_msg->key, 0, NULL, ntohl (dht_msg->type), | &dht_msg->key, 0, NULL, ntohl (dht_msg->type), | |||
size - sizeof (struct GNUNET_DHT_ClientPutMessa ge), | size - sizeof (struct GNUNET_DHT_ClientPutMessa ge), | |||
&dht_msg[1]); | &dht_msg[1]); | |||
skipping to change at line 482 | skipping to change at line 516 | |||
peer_bf = | peer_bf = | |||
GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, | GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, | |||
GNUNET_CONSTANTS_BLOOMFILTER_K); | GNUNET_CONSTANTS_BLOOMFILTER_K); | |||
GDS_NEIGHBOURS_handle_put (ntohl (dht_msg->type), ntohl (dht_msg->options ), | GDS_NEIGHBOURS_handle_put (ntohl (dht_msg->type), ntohl (dht_msg->options ), | |||
ntohl (dht_msg->desired_replication_level), | ntohl (dht_msg->desired_replication_level), | |||
GNUNET_TIME_absolute_ntoh (dht_msg->expiration ), | GNUNET_TIME_absolute_ntoh (dht_msg->expiration ), | |||
0 /* hop count */ , | 0 /* hop count */ , | |||
peer_bf, &dht_msg->key, 0, NULL, &dht_msg[1], | peer_bf, &dht_msg->key, 0, NULL, &dht_msg[1], | |||
size - | size - | |||
sizeof (struct GNUNET_DHT_ClientPutMessage)); | sizeof (struct GNUNET_DHT_ClientPutMessage)); | |||
GDS_CLIENTS_process_put (ntohl (dht_msg->options), | ||||
ntohl (dht_msg->type), | ||||
0, | ||||
ntohl (dht_msg->desired_replication_level), | ||||
1, | ||||
GDS_NEIGHBOURS_get_id(), | ||||
GNUNET_TIME_absolute_ntoh (dht_msg->expiration), | ||||
&dht_msg->key, | ||||
&dht_msg[1], | ||||
size - sizeof (struct GNUNET_DHT_ClientPutMessag | ||||
e)); | ||||
GNUNET_CONTAINER_bloomfilter_free (peer_bf); | GNUNET_CONTAINER_bloomfilter_free (peer_bf); | |||
pm = GNUNET_malloc (sizeof (struct PendingMessage) + | ||||
sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage | ||||
)); | ||||
conf = (struct GNUNET_DHT_ClientPutConfirmationMessage *) &pm[1]; | ||||
conf->header.size = htons (sizeof (struct GNUNET_DHT_ClientPutConfirmatio | ||||
nMessage)); | ||||
conf->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK); | ||||
conf->reserved = htonl (0); | ||||
conf->unique_id = dht_msg->unique_id; | ||||
pm->msg = &conf->header; | ||||
add_pending_message (find_active_client (client), pm); | ||||
GNUNET_SERVER_receive_done (client, GNUNET_OK); | GNUNET_SERVER_receive_done (client, GNUNET_OK); | |||
} | } | |||
/** | /** | |||
* Handler for any generic DHT messages, calls the appropriate handler | * Handler for any generic DHT messages, calls the appropriate handler | |||
* depending on message type, sends confirmation if responses aren't otherw ise | * depending on message type, sends confirmation if responses aren't otherw ise | |||
* expected. | * expected. | |||
* | * | |||
* @param cls closure for the service | * @param cls closure for the service | |||
* @param client the client we received this message from | * @param client the client we received this message from | |||
skipping to change at line 519 | skipping to change at line 572 | |||
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); | GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); | |||
return; | return; | |||
} | } | |||
xquery_size = size - sizeof (struct GNUNET_DHT_ClientGetMessage); | xquery_size = size - sizeof (struct GNUNET_DHT_ClientGetMessage); | |||
get = (const struct GNUNET_DHT_ClientGetMessage *) message; | get = (const struct GNUNET_DHT_ClientGetMessage *) message; | |||
xquery = (const char *) &get[1]; | xquery = (const char *) &get[1]; | |||
GNUNET_STATISTICS_update (GDS_stats, | GNUNET_STATISTICS_update (GDS_stats, | |||
gettext_noop | gettext_noop | |||
("# GET requests received from clients"), 1, | ("# GET requests received from clients"), 1, | |||
GNUNET_NO); | GNUNET_NO); | |||
#if DEBUG_DHT | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | |||
"Received request for %s from local client %p\n", | "Received request for %s from local client %p\n", | |||
GNUNET_h2s (&get->key), client); | GNUNET_h2s (&get->key), client); | |||
#endif | ||||
cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size); | cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size); | |||
cqr->key = get->key; | cqr->key = get->key; | |||
cqr->client = find_active_client (client); | cqr->client = find_active_client (client); | |||
cqr->xquery = (void *) &cqr[1]; | cqr->xquery = (void *) &cqr[1]; | |||
memcpy (&cqr[1], xquery, xquery_size); | memcpy (&cqr[1], xquery, xquery_size); | |||
cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr, 0); | cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr, 0); | |||
cqr->retry_frequency = GNUNET_TIME_UNIT_MILLISECONDS; | cqr->retry_frequency = GNUNET_TIME_UNIT_MILLISECONDS; | |||
cqr->retry_time = GNUNET_TIME_absolute_get (); | cqr->retry_time = GNUNET_TIME_absolute_get (); | |||
cqr->unique_id = get->unique_id; | cqr->unique_id = get->unique_id; | |||
cqr->xquery_size = xquery_size; | cqr->xquery_size = xquery_size; | |||
cqr->replication = ntohl (get->desired_replication_level); | cqr->replication = ntohl (get->desired_replication_level); | |||
cqr->msg_options = ntohl (get->options); | cqr->msg_options = ntohl (get->options); | |||
cqr->type = ntohl (get->type); | cqr->type = ntohl (get->type); | |||
GNUNET_CONTAINER_multihashmap_put (forward_map, &get->key, cqr, | GNUNET_CONTAINER_multihashmap_put (forward_map, &get->key, cqr, | |||
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MU LTIPLE); | GNUNET_CONTAINER_MULTIHASHMAPOPTION_MU LTIPLE); | |||
GDS_CLIENTS_process_get (ntohl (get->options), | ||||
ntohl (get->type), | ||||
0, | ||||
ntohl (get->desired_replication_level), | ||||
1, | ||||
GDS_NEIGHBOURS_get_id(), | ||||
&get->key); | ||||
/* start remote requests */ | /* start remote requests */ | |||
if (GNUNET_SCHEDULER_NO_TASK != retry_task) | if (GNUNET_SCHEDULER_NO_TASK != retry_task) | |||
GNUNET_SCHEDULER_cancel (retry_task); | GNUNET_SCHEDULER_cancel (retry_task); | |||
retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task, NULL) ; | retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task, NULL) ; | |||
/* perform local lookup */ | /* perform local lookup */ | |||
GDS_DATACACHE_handle_get (&get->key, cqr->type, cqr->xquery, xquery_size, | GDS_DATACACHE_handle_get (&get->key, cqr->type, cqr->xquery, xquery_size, | |||
NULL, 0); | NULL, 0); | |||
GNUNET_SERVER_receive_done (client, GNUNET_OK); | GNUNET_SERVER_receive_done (client, GNUNET_OK); | |||
} | } | |||
skipping to change at line 582 | skipping to change at line 640 | |||
* @return GNUNET_YES (we should continue to iterate) | * @return GNUNET_YES (we should continue to iterate) | |||
*/ | */ | |||
static int | static int | |||
remove_by_unique_id (void *cls, const GNUNET_HashCode * key, void *value) | remove_by_unique_id (void *cls, const GNUNET_HashCode * key, void *value) | |||
{ | { | |||
const struct RemoveByUniqueIdContext *ctx = cls; | const struct RemoveByUniqueIdContext *ctx = cls; | |||
struct ClientQueryRecord *record = value; | struct ClientQueryRecord *record = value; | |||
if (record->unique_id != ctx->unique_id) | if (record->unique_id != ctx->unique_id) | |||
return GNUNET_YES; | return GNUNET_YES; | |||
#if DEBUG_DHT | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | |||
"Removing client %p's record for key %s (by unique id)\n", | "Removing client %p's record for key %s (by unique id)\n", | |||
ctx->client->client_handle, GNUNET_h2s (key)); | ctx->client->client_handle, GNUNET_h2s (key)); | |||
#endif | ||||
return remove_client_records (ctx->client, key, record); | return remove_client_records (ctx->client, key, record); | |||
} | } | |||
/** | /** | |||
* Handler for any generic DHT stop messages, calls the appropriate handler | * Handler for any generic DHT stop messages, calls the appropriate handler | |||
* depending on message type (if processed locally) | * depending on message type (if processed locally) | |||
* | * | |||
* @param cls closure for the service | * @param cls closure for the service | |||
* @param client the client we received this message from | * @param client the client we received this message from | |||
* @param message the actual message received | * @param message the actual message received | |||
skipping to change at line 611 | skipping to change at line 667 | |||
const struct GNUNET_MessageHeader *message) | const struct GNUNET_MessageHeader *message) | |||
{ | { | |||
const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg = | const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg = | |||
(const struct GNUNET_DHT_ClientGetStopMessage *) message; | (const struct GNUNET_DHT_ClientGetStopMessage *) message; | |||
struct RemoveByUniqueIdContext ctx; | struct RemoveByUniqueIdContext ctx; | |||
GNUNET_STATISTICS_update (GDS_stats, | GNUNET_STATISTICS_update (GDS_stats, | |||
gettext_noop | gettext_noop | |||
("# GET STOP requests received from clients"), 1, | ("# GET STOP requests received from clients"), 1, | |||
GNUNET_NO); | GNUNET_NO); | |||
#if DEBUG_DHT | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p stopped request for key % s\n", | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p stopped request for key % s\n", | |||
client, GNUNET_h2s (&dht_stop_msg->key)); | client, GNUNET_h2s (&dht_stop_msg->key)); | |||
#endif | ||||
ctx.client = find_active_client (client); | ctx.client = find_active_client (client); | |||
ctx.unique_id = dht_stop_msg->unique_id; | ctx.unique_id = dht_stop_msg->unique_id; | |||
GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, &dht_stop_msg->k ey, | GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, &dht_stop_msg->k ey, | |||
&remove_by_unique_id, &ctx); | &remove_by_unique_id, &ctx); | |||
GNUNET_SERVER_receive_done (client, GNUNET_OK); | GNUNET_SERVER_receive_done (client, GNUNET_OK); | |||
} | } | |||
/** | /** | |||
* Handler for monitor messages | * Handler for monitor start messages | |||
* | * | |||
* @param cls closure for the service | * @param cls closure for the service | |||
* @param client the client we received this message from | * @param client the client we received this message from | |||
* @param message the actual message received | * @param message the actual message received | |||
* | * | |||
*/ | */ | |||
static void | static void | |||
handle_dht_local_monitor (void *cls, struct GNUNET_SERVER_Client *client, | handle_dht_local_monitor (void *cls, struct GNUNET_SERVER_Client *client, | |||
const struct GNUNET_MessageHeader *message) | const struct GNUNET_MessageHeader *message) | |||
{ | { | |||
struct ClientMonitorRecord *r; | struct ClientMonitorRecord *r; | |||
const struct GNUNET_DHT_MonitorMessage *msg; | const struct GNUNET_DHT_MonitorStartStopMessage *msg; | |||
unsigned int i; | ||||
char *c; | ||||
msg = (struct GNUNET_DHT_MonitorMessage *) message; | msg = (struct GNUNET_DHT_MonitorStartStopMessage *) message; | |||
r = GNUNET_malloc (sizeof(struct ClientMonitorRecord)); | r = GNUNET_malloc (sizeof(struct ClientMonitorRecord)); | |||
r->client = find_active_client(client); | r->client = find_active_client(client); | |||
r->type = ntohl(msg->type); | r->type = ntohl(msg->type); | |||
c = (char *) &msg->key; | r->get = ntohs(msg->get); | |||
for (i = 0; i < sizeof (GNUNET_HashCode) && c[i] == 0; i++); | r->get_resp = ntohs(msg->get_resp); | |||
if (sizeof (GNUNET_HashCode) == i) | r->put = ntohs(msg->put); | |||
r->key = NULL; | if (0 == ntohs(msg->filter_key)) | |||
r->key = NULL; | ||||
else | else | |||
{ | { | |||
r->key = GNUNET_malloc (sizeof (GNUNET_HashCode)); | r->key = GNUNET_malloc (sizeof (GNUNET_HashCode)); | |||
memcpy (r->key, &msg->key, sizeof (GNUNET_HashCode)); | memcpy (r->key, &msg->key, sizeof (GNUNET_HashCode)); | |||
} | } | |||
GNUNET_CONTAINER_DLL_insert (monitor_head, monitor_tail, r); | GNUNET_CONTAINER_DLL_insert (monitor_head, monitor_tail, r); | |||
// FIXME add remove somewhere | ||||
GNUNET_SERVER_receive_done (client, GNUNET_OK); | GNUNET_SERVER_receive_done (client, GNUNET_OK); | |||
} | } | |||
/** | /** | |||
* Task run to check for messages that need to be sent to a client. | * Handler for monitor stop messages | |||
* | ||||
* @param cls closure for the service | ||||
* @param client the client we received this message from | ||||
* @param message the actual message received | ||||
* | * | |||
* @param client a ClientList, containing the client and any messages to be sent to it | ||||
*/ | */ | |||
static void | static void | |||
process_pending_messages (struct ClientList *client); | handle_dht_local_monitor_stop (void *cls, struct GNUNET_SERVER_Client *clie | |||
nt, | ||||
const struct GNUNET_MessageHeader *message) | ||||
{ | ||||
struct ClientMonitorRecord *r; | ||||
const struct GNUNET_DHT_MonitorStartStopMessage *msg; | ||||
int keys_match; | ||||
msg = (struct GNUNET_DHT_MonitorStartStopMessage *) message; | ||||
r = monitor_head; | ||||
while (NULL != r) | ||||
{ | ||||
if (NULL == r->key) | ||||
keys_match = (0 == ntohs(msg->filter_key)); | ||||
else | ||||
{ | ||||
keys_match = (0 != ntohs(msg->filter_key) | ||||
&& !memcmp(r->key, &msg->key, sizeof(GNUNET_HashCode) | ||||
)); | ||||
} | ||||
if (find_active_client(client) == r->client | ||||
&& ntohl(msg->type) == r->type | ||||
&& r->get == msg->get | ||||
&& r->get_resp == msg->get_resp | ||||
&& r->put == msg->put | ||||
&& keys_match | ||||
) | ||||
{ | ||||
GNUNET_CONTAINER_DLL_remove (monitor_head, monitor_tail, r); | ||||
GNUNET_free_non_null (r->key); | ||||
GNUNET_free (r); | ||||
GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||||
return; /* Delete only ONE entry */ | ||||
} | ||||
r = r->next; | ||||
} | ||||
GNUNET_SERVER_receive_done (client, GNUNET_OK); | ||||
} | ||||
/** | /** | |||
* Callback called as a result of issuing a GNUNET_SERVER_notify_transmit_r eady | * Callback called as a result of issuing a GNUNET_SERVER_notify_transmit_r eady | |||
* request. A ClientList is passed as closure, take the head of the list | * request. A ClientList is passed as closure, take the head of the list | |||
* and copy it into buf, which has the result of sending the message to the | * and copy it into buf, which has the result of sending the message to the | |||
* client. | * client. | |||
* | * | |||
* @param cls closure to this call | * @param cls closure to this call | |||
* @param size maximum number of bytes available to send | * @param size maximum number of bytes available to send | |||
* @param buf where to copy the actual message to | * @param buf where to copy the actual message to | |||
skipping to change at line 691 | skipping to change at line 783 | |||
struct ClientList *client = cls; | struct ClientList *client = cls; | |||
char *cbuf = buf; | char *cbuf = buf; | |||
struct PendingMessage *reply; | struct PendingMessage *reply; | |||
size_t off; | size_t off; | |||
size_t msize; | size_t msize; | |||
client->transmit_handle = NULL; | client->transmit_handle = NULL; | |||
if (buf == NULL) | if (buf == NULL) | |||
{ | { | |||
/* client disconnected */ | /* client disconnected */ | |||
#if DEBUG_DHT | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | |||
"Client %p disconnected, pending messages will be discarded \n", | "Client %p disconnected, pending messages will be discarded \n", | |||
client->client_handle); | client->client_handle); | |||
#endif | ||||
return 0; | return 0; | |||
} | } | |||
off = 0; | off = 0; | |||
while ((NULL != (reply = client->pending_head)) && | while ((NULL != (reply = client->pending_head)) && | |||
(size >= off + (msize = ntohs (reply->msg->size)))) | (size >= off + (msize = ntohs (reply->msg->size)))) | |||
{ | { | |||
GNUNET_CONTAINER_DLL_remove (client->pending_head, client->pending_tail , | GNUNET_CONTAINER_DLL_remove (client->pending_head, client->pending_tail , | |||
reply); | reply); | |||
memcpy (&cbuf[off], reply->msg, msize); | memcpy (&cbuf[off], reply->msg, msize); | |||
GNUNET_free (reply); | GNUNET_free (reply); | |||
#if DEBUG_DHT | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u bytes to client % p\n", | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u bytes to client % p\n", | |||
msize, client->client_handle); | msize, client->client_handle); | |||
#endif | ||||
off += msize; | off += msize; | |||
} | } | |||
process_pending_messages (client); | process_pending_messages (client); | |||
#if DEBUG_DHT | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitted %u/%u bytes to client % p\n", | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitted %u/%u bytes to client % p\n", | |||
(unsigned int) off, (unsigned int) size, client->client_handl e); | (unsigned int) off, (unsigned int) size, client->client_handl e); | |||
#endif | ||||
return off; | return off; | |||
} | } | |||
/** | /** | |||
* Task run to check for messages that need to be sent to a client. | * Task run to check for messages that need to be sent to a client. | |||
* | * | |||
* @param client a ClientList, containing the client and any messages to be sent to it | * @param client a ClientList, containing the client and any messages to be sent to it | |||
*/ | */ | |||
static void | static void | |||
process_pending_messages (struct ClientList *client) | process_pending_messages (struct ClientList *client) | |||
{ | { | |||
if ((client->pending_head == NULL) || (client->transmit_handle != NULL)) | if ((client->pending_head == NULL) || (client->transmit_handle != NULL)) | |||
{ | { | |||
#if DEBUG_DHT | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | |||
"Not asking for transmission to %p now: %s\n", | "Not asking for transmission to %p now: %s\n", | |||
client->client_handle, | client->client_handle, | |||
client->pending_head == | client->pending_head == | |||
NULL ? "no more messages" : "request already pending"); | NULL ? "no more messages" : "request already pending"); | |||
#endif | ||||
return; | return; | |||
} | } | |||
#if DEBUG_DHT | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | |||
"Asking for transmission of %u bytes to client %p\n", | "Asking for transmission of %u bytes to client %p\n", | |||
ntohs (client->pending_head->msg->size), client->client_handl e); | ntohs (client->pending_head->msg->size), client->client_handl e); | |||
#endif | ||||
client->transmit_handle = | client->transmit_handle = | |||
GNUNET_SERVER_notify_transmit_ready (client->client_handle, | GNUNET_SERVER_notify_transmit_ready (client->client_handle, | |||
ntohs (client->pending_head-> | ntohs (client->pending_head-> | |||
msg->size), | msg->size), | |||
GNUNET_TIME_UNIT_FOREVER_REL, | GNUNET_TIME_UNIT_FOREVER_REL, | |||
&send_reply_to_client, client); | &send_reply_to_client, client); | |||
} | } | |||
/** | /** | |||
* Add a PendingMessage to the clients list of messages to be sent | ||||
* | ||||
* @param client the active client to send the message to | ||||
* @param pending_message the actual message to send | ||||
*/ | ||||
static void | ||||
add_pending_message (struct ClientList *client, | ||||
struct PendingMessage *pending_message) | ||||
{ | ||||
GNUNET_CONTAINER_DLL_insert_tail (client->pending_head, client->pending_t | ||||
ail, | ||||
pending_message); | ||||
process_pending_messages (client); | ||||
} | ||||
/** | ||||
* Closure for 'forward_reply' | * Closure for 'forward_reply' | |||
*/ | */ | |||
struct ForwardReplyContext | struct ForwardReplyContext | |||
{ | { | |||
/** | /** | |||
* Actual message to send to matching clients. | * Actual message to send to matching clients. | |||
*/ | */ | |||
struct PendingMessage *pm; | struct PendingMessage *pm; | |||
skipping to change at line 825 | skipping to change at line 892 | |||
struct ClientQueryRecord *record = value; | struct ClientQueryRecord *record = value; | |||
struct PendingMessage *pm; | struct PendingMessage *pm; | |||
struct GNUNET_DHT_ClientResultMessage *reply; | struct GNUNET_DHT_ClientResultMessage *reply; | |||
enum GNUNET_BLOCK_EvaluationResult eval; | enum GNUNET_BLOCK_EvaluationResult eval; | |||
int do_free; | int do_free; | |||
GNUNET_HashCode ch; | GNUNET_HashCode ch; | |||
unsigned int i; | unsigned int i; | |||
if ((record->type != GNUNET_BLOCK_TYPE_ANY) && (record->type != frc->type )) | if ((record->type != GNUNET_BLOCK_TYPE_ANY) && (record->type != frc->type )) | |||
{ | { | |||
#if DEBUG_DHT | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | |||
"Record type missmatch, not passing request for key %s to l ocal client\n", | "Record type missmatch, not passing request for key %s to l ocal client\n", | |||
GNUNET_h2s (key)); | GNUNET_h2s (key)); | |||
#endif | ||||
GNUNET_STATISTICS_update (GDS_stats, | GNUNET_STATISTICS_update (GDS_stats, | |||
gettext_noop | gettext_noop | |||
("# Key match, type mismatches in REPLY to CL IENT"), | ("# Key match, type mismatches in REPLY to CL IENT"), | |||
1, GNUNET_NO); | 1, GNUNET_NO); | |||
return GNUNET_YES; /* type mismatch */ | return GNUNET_YES; /* type mismatch */ | |||
} | } | |||
GNUNET_CRYPTO_hash (frc->data, frc->data_size, &ch); | GNUNET_CRYPTO_hash (frc->data, frc->data_size, &ch); | |||
for (i = 0; i < record->seen_replies_count; i++) | for (i = 0; i < record->seen_replies_count; i++) | |||
if (0 == memcmp (&record->seen_replies[i], &ch, sizeof (GNUNET_HashCode ))) | if (0 == memcmp (&record->seen_replies[i], &ch, sizeof (GNUNET_HashCode ))) | |||
{ | { | |||
#if DEBUG_DHT | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | |||
"Duplicate reply, not passing request for key %s to local client\n", | "Duplicate reply, not passing request for key %s to local client\n", | |||
GNUNET_h2s (key)); | GNUNET_h2s (key)); | |||
#endif | ||||
GNUNET_STATISTICS_update (GDS_stats, | GNUNET_STATISTICS_update (GDS_stats, | |||
gettext_noop | gettext_noop | |||
("# Duplicate REPLIES to CLIENT request dro pped"), | ("# Duplicate REPLIES to CLIENT request dro pped"), | |||
1, GNUNET_NO); | 1, GNUNET_NO); | |||
return GNUNET_YES; /* duplicate */ | return GNUNET_YES; /* duplicate */ | |||
} | } | |||
eval = | eval = | |||
GNUNET_BLOCK_evaluate (GDS_block_context, record->type, key, NULL, 0, | GNUNET_BLOCK_evaluate (GDS_block_context, record->type, key, NULL, 0, | |||
record->xquery, record->xquery_size, frc->data , | record->xquery, record->xquery_size, frc->data , | |||
frc->data_size); | frc->data_size); | |||
#if DEBUG_DHT | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | |||
"Evaluation result is %d for key %s for local client's query\ n", | "Evaluation result is %d for key %s for local client's query\ n", | |||
(int) eval, GNUNET_h2s (key)); | (int) eval, GNUNET_h2s (key)); | |||
#endif | ||||
switch (eval) | switch (eval) | |||
{ | { | |||
case GNUNET_BLOCK_EVALUATION_OK_LAST: | case GNUNET_BLOCK_EVALUATION_OK_LAST: | |||
do_free = GNUNET_YES; | do_free = GNUNET_YES; | |||
break; | break; | |||
case GNUNET_BLOCK_EVALUATION_OK_MORE: | case GNUNET_BLOCK_EVALUATION_OK_MORE: | |||
GNUNET_array_append (record->seen_replies, record->seen_replies_count, ch); | GNUNET_array_append (record->seen_replies, record->seen_replies_count, ch); | |||
do_free = GNUNET_NO; | do_free = GNUNET_NO; | |||
break; | break; | |||
case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE: | case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE: | |||
skipping to change at line 910 | skipping to change at line 971 | |||
ntohs (frc->pm->msg->size)); | ntohs (frc->pm->msg->size)); | |||
memcpy (pm, frc->pm, | memcpy (pm, frc->pm, | |||
sizeof (struct PendingMessage) + ntohs (frc->pm->msg->size)); | sizeof (struct PendingMessage) + ntohs (frc->pm->msg->size)); | |||
pm->next = pm->prev = NULL; | pm->next = pm->prev = NULL; | |||
} | } | |||
GNUNET_STATISTICS_update (GDS_stats, | GNUNET_STATISTICS_update (GDS_stats, | |||
gettext_noop ("# RESULTS queued for clients"), 1, | gettext_noop ("# RESULTS queued for clients"), 1, | |||
GNUNET_NO); | GNUNET_NO); | |||
reply = (struct GNUNET_DHT_ClientResultMessage *) &pm[1]; | reply = (struct GNUNET_DHT_ClientResultMessage *) &pm[1]; | |||
reply->unique_id = record->unique_id; | reply->unique_id = record->unique_id; | |||
#if DEBUG_DHT | ||||
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, | |||
"Queueing reply to query %s for client %p\n", GNUNET_h2s (key ), | "Queueing reply to query %s for client %p\n", GNUNET_h2s (key ), | |||
record->client->client_handle); | record->client->client_handle); | |||
#endif | ||||
add_pending_message (record->client, pm); | add_pending_message (record->client, pm); | |||
if (GNUNET_YES == do_free) | if (GNUNET_YES == do_free) | |||
remove_client_records (record->client, key, record); | remove_client_records (record->client, key, record); | |||
return GNUNET_YES; | return GNUNET_YES; | |||
} | } | |||
/** | /** | |||
* Handle a reply we've received from another peer. If the reply | * Handle a reply we've received from another peer. If the reply | |||
* matches any of our pending queries, forward it to the respective | * matches any of our pending queries, forward it to the respective | |||
* client(s). | * client(s). | |||
skipping to change at line 1006 | skipping to change at line 1065 | |||
/* did not match any of the requests, free! */ | /* did not match any of the requests, free! */ | |||
GNUNET_STATISTICS_update (GDS_stats, | GNUNET_STATISTICS_update (GDS_stats, | |||
gettext_noop | gettext_noop | |||
("# REPLIES ignored for CLIENTS (no match)"), 1, | ("# REPLIES ignored for CLIENTS (no match)"), 1, | |||
GNUNET_NO); | GNUNET_NO); | |||
GNUNET_free (pm); | GNUNET_free (pm); | |||
} | } | |||
} | } | |||
/** | /** | |||
* Check if some client is monitoring messages of this type and notify | * Check if some client is monitoring GET messages and notify | |||
* him in that case. | * them in that case. | |||
* | * | |||
* @param mtype Type of the DHT message. | * @param options Options, for instance RecordRoute, DemultiplexEverywhere. | |||
* @param exp When will this value expire. | * @param type The type of data in the request. | |||
* @param key Key of the result/request. | * @param hop_count Hop count so far. | |||
* @param putl number of entries in get_path. | * @param path_length number of entries in path (or 0 if not recorded). | |||
* @param put_path peers on the PUT path (or NULL if not recorded). | * @param path peers on the GET path (or NULL if not recorded). | |||
* @param getl number of entries in get_path. | ||||
* @param get_path Peers on reply path (or NULL if not recorded). | ||||
* @param desired_replication_level Desired replication level. | * @param desired_replication_level Desired replication level. | |||
* @param type Type of the result/request. | * @param key Key of the requested data. | |||
*/ | ||||
void | ||||
GDS_CLIENTS_process_get (uint32_t options, | ||||
enum GNUNET_BLOCK_Type type, | ||||
uint32_t hop_count, | ||||
uint32_t desired_replication_level, | ||||
unsigned int path_length, | ||||
const struct GNUNET_PeerIdentity *path, | ||||
const GNUNET_HashCode * key) | ||||
{ | ||||
struct ClientMonitorRecord *m; | ||||
struct ClientList **cl; | ||||
unsigned int cl_size; | ||||
cl = NULL; | ||||
cl_size = 0; | ||||
for (m = monitor_head; NULL != m; m = m->next) | ||||
{ | ||||
if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) && | ||||
(NULL == m->key || | ||||
memcmp (key, m->key, sizeof(GNUNET_HashCode)) == 0)) | ||||
{ | ||||
struct PendingMessage *pm; | ||||
struct GNUNET_DHT_MonitorGetMessage *mmsg; | ||||
struct GNUNET_PeerIdentity *msg_path; | ||||
size_t msize; | ||||
unsigned int i; | ||||
/* Don't send duplicates */ | ||||
for (i = 0; i < cl_size; i++) | ||||
if (cl[i] == m->client) | ||||
break; | ||||
if (i < cl_size) | ||||
continue; | ||||
GNUNET_array_append (cl, cl_size, m->client); | ||||
msize = path_length * sizeof (struct GNUNET_PeerIdentity); | ||||
msize += sizeof (struct GNUNET_DHT_MonitorGetMessage); | ||||
msize += sizeof (struct PendingMessage); | ||||
pm = (struct PendingMessage *) GNUNET_malloc (msize); | ||||
mmsg = (struct GNUNET_DHT_MonitorGetMessage *) &pm[1]; | ||||
pm->msg = &mmsg->header; | ||||
mmsg->header.size = htons (msize - sizeof (struct PendingMessage)); | ||||
mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET); | ||||
mmsg->options = htonl(options); | ||||
mmsg->type = htonl(type); | ||||
mmsg->hop_count = htonl(hop_count); | ||||
mmsg->desired_replication_level = htonl(desired_replication_level); | ||||
mmsg->get_path_length = htonl(path_length); | ||||
memcpy (&mmsg->key, key, sizeof (GNUNET_HashCode)); | ||||
msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1]; | ||||
if (path_length > 0) | ||||
memcpy (msg_path, path, | ||||
path_length * sizeof (struct GNUNET_PeerIdentity)); | ||||
add_pending_message (m->client, pm); | ||||
} | ||||
} | ||||
GNUNET_free_non_null (cl); | ||||
} | ||||
/** | ||||
* Check if some client is monitoring GET RESP messages and notify | ||||
* them in that case. | ||||
* | ||||
* @param type The type of data in the result. | ||||
* @param get_path Peers on GET path (or NULL if not recorded). | ||||
* @param get_path_length number of entries in get_path. | ||||
* @param put_path peers on the PUT path (or NULL if not recorded). | ||||
* @param put_path_length number of entries in get_path. | ||||
* @param exp Expiration time of the data. | ||||
* @param key Key of the data. | ||||
* @param data Pointer to the result data. | * @param data Pointer to the result data. | |||
* @param size Number of bytes in data. | * @param size Number of bytes in data. | |||
*/ | */ | |||
void | void | |||
GDS_CLIENTS_process_monitor (uint16_t mtype, | GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type, | |||
const struct GNUNET_TIME_Absolute exp, | const struct GNUNET_PeerIdentity *get_path, | |||
const GNUNET_HashCode *key, | unsigned int get_path_length, | |||
uint32_t putl, | const struct GNUNET_PeerIdentity *put_path, | |||
const struct GNUNET_PeerIdentity *put_path, | unsigned int put_path_length, | |||
uint32_t getl, | struct GNUNET_TIME_Absolute exp, | |||
const struct GNUNET_PeerIdentity *get_path, | const GNUNET_HashCode * key, | |||
uint32_t desired_replication_level, | const void *data, | |||
enum GNUNET_BLOCK_Type type, | size_t size) | |||
const struct GNUNET_MessageHeader *data, | ||||
uint16_t size) | ||||
{ | { | |||
struct ClientMonitorRecord *m; | struct ClientMonitorRecord *m; | |||
struct ClientList **cl; | struct ClientList **cl; | |||
unsigned int cl_size; | unsigned int cl_size; | |||
cl = NULL; | cl = NULL; | |||
cl_size = 0; | cl_size = 0; | |||
for (m = monitor_head; NULL != m; m = m->next) | for (m = monitor_head; NULL != m; m = m->next) | |||
{ | { | |||
if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) && | if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) && | |||
(NULL == m->key || | (NULL == m->key || | |||
memcmp (key, m->key, sizeof(GNUNET_HashCode)) == 0)) | memcmp (key, m->key, sizeof(GNUNET_HashCode)) == 0)) | |||
{ | { | |||
struct PendingMessage *pm; | struct PendingMessage *pm; | |||
struct GNUNET_DHT_MonitorMessage *mmsg; | struct GNUNET_DHT_MonitorGetRespMessage *mmsg; | |||
struct GNUNET_PeerIdentity *path; | struct GNUNET_PeerIdentity *path; | |||
size_t msize; | size_t msize; | |||
unsigned int i; | unsigned int i; | |||
/* Don't send duplicates */ | /* Don't send duplicates */ | |||
for (i = 0; i < cl_size; i++) | for (i = 0; i < cl_size; i++) | |||
if (cl[i] == m->client) | if (cl[i] == m->client) | |||
break; | break; | |||
if (i < cl_size) | if (i < cl_size) | |||
continue; | continue; | |||
GNUNET_array_append (cl, cl_size, m->client); | GNUNET_array_append (cl, cl_size, m->client); | |||
msize = size; | msize = size; | |||
msize += (getl + putl) * sizeof (struct GNUNET_PeerIdentity); | msize += (get_path_length + put_path_length) | |||
msize += sizeof (struct GNUNET_DHT_MonitorMessage); | * sizeof (struct GNUNET_PeerIdentity); | |||
msize += sizeof (struct GNUNET_DHT_MonitorGetRespMessage); | ||||
msize += sizeof (struct PendingMessage); | msize += sizeof (struct PendingMessage); | |||
pm = (struct PendingMessage *) GNUNET_malloc (msize); | pm = (struct PendingMessage *) GNUNET_malloc (msize); | |||
mmsg = (struct GNUNET_DHT_MonitorMessage *) &pm[1]; | mmsg = (struct GNUNET_DHT_MonitorGetRespMessage *) &pm[1]; | |||
pm->msg = (struct GNUNET_MessageHeader *) mmsg; | pm->msg = (struct GNUNET_MessageHeader *) mmsg; | |||
mmsg->header.size = htons (msize - sizeof (struct PendingMessage)); | mmsg->header.size = htons (msize - sizeof (struct PendingMessage)); | |||
mmsg->header.type = htons (mtype); | mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP); | |||
mmsg->expiration = GNUNET_TIME_absolute_hton(exp); | mmsg->type = htonl(type); | |||
memcpy (&mmsg->key, key, sizeof (GNUNET_HashCode)); | mmsg->put_path_length = htonl(put_path_length); | |||
mmsg->put_path_length = htonl(putl); | mmsg->get_path_length = htonl(get_path_length); | |||
mmsg->get_path_length = htonl(getl); | ||||
mmsg->desired_replication_level = htonl (desired_replication_level); | ||||
path = (struct GNUNET_PeerIdentity *) &mmsg[1]; | path = (struct GNUNET_PeerIdentity *) &mmsg[1]; | |||
if (putl > 0) | if (put_path_length > 0) | |||
{ | { | |||
memcpy (path, put_path, putl * sizeof (struct GNUNET_PeerIdentity)) | memcpy (path, put_path, | |||
; | put_path_length * sizeof (struct GNUNET_PeerIdentity)); | |||
path = &path[putl]; | path = &path[put_path_length]; | |||
} | } | |||
if (getl > 0) | if (get_path_length > 0) | |||
memcpy (path, get_path, getl * sizeof (struct GNUNET_PeerIdentity)) | memcpy (path, get_path, | |||
; | get_path_length * sizeof (struct GNUNET_PeerIdentity)); | |||
mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp); | ||||
memcpy (&mmsg->key, key, sizeof (GNUNET_HashCode)); | ||||
if (size > 0) | if (size > 0) | |||
memcpy (&path[getl], data, size); | memcpy (&path[get_path_length], data, size); | |||
add_pending_message (m->client, pm); | ||||
} | ||||
} | ||||
GNUNET_free_non_null (cl); | ||||
} | ||||
/** | ||||
* Check if some client is monitoring PUT messages and notify | ||||
* them in that case. | ||||
* | ||||
* @param options Options, for instance RecordRoute, DemultiplexEverywhere. | ||||
* @param type The type of data in the request. | ||||
* @param hop_count Hop count so far. | ||||
* @param path_length number of entries in path (or 0 if not recorded). | ||||
* @param path peers on the PUT path (or NULL if not recorded). | ||||
* @param desired_replication_level Desired replication level. | ||||
* @param exp Expiration time of the data. | ||||
* @param key Key under which data is to be stored. | ||||
* @param data Pointer to the data carried. | ||||
* @param size Number of bytes in data. | ||||
*/ | ||||
void | ||||
GDS_CLIENTS_process_put (uint32_t options, | ||||
enum GNUNET_BLOCK_Type type, | ||||
uint32_t hop_count, | ||||
uint32_t desired_replication_level, | ||||
unsigned int path_length, | ||||
const struct GNUNET_PeerIdentity *path, | ||||
struct GNUNET_TIME_Absolute exp, | ||||
const GNUNET_HashCode * key, | ||||
const void *data, | ||||
size_t size) | ||||
{ | ||||
struct ClientMonitorRecord *m; | ||||
struct ClientList **cl; | ||||
unsigned int cl_size; | ||||
cl = NULL; | ||||
cl_size = 0; | ||||
for (m = monitor_head; NULL != m; m = m->next) | ||||
{ | ||||
if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) && | ||||
(NULL == m->key || | ||||
memcmp (key, m->key, sizeof(GNUNET_HashCode)) == 0)) | ||||
{ | ||||
struct PendingMessage *pm; | ||||
struct GNUNET_DHT_MonitorPutMessage *mmsg; | ||||
struct GNUNET_PeerIdentity *msg_path; | ||||
size_t msize; | ||||
unsigned int i; | ||||
/* Don't send duplicates */ | ||||
for (i = 0; i < cl_size; i++) | ||||
if (cl[i] == m->client) | ||||
break; | ||||
if (i < cl_size) | ||||
continue; | ||||
GNUNET_array_append (cl, cl_size, m->client); | ||||
msize = size; | ||||
msize += path_length * sizeof (struct GNUNET_PeerIdentity); | ||||
msize += sizeof (struct GNUNET_DHT_MonitorPutMessage); | ||||
msize += sizeof (struct PendingMessage); | ||||
pm = (struct PendingMessage *) GNUNET_malloc (msize); | ||||
mmsg = (struct GNUNET_DHT_MonitorPutMessage *) &pm[1]; | ||||
pm->msg = (struct GNUNET_MessageHeader *) mmsg; | ||||
mmsg->header.size = htons (msize - sizeof (struct PendingMessage)); | ||||
mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT); | ||||
mmsg->options = htonl(options); | ||||
mmsg->type = htonl(type); | ||||
mmsg->hop_count = htonl(hop_count); | ||||
mmsg->desired_replication_level = htonl(desired_replication_level); | ||||
mmsg->put_path_length = htonl(path_length); | ||||
msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1]; | ||||
if (path_length > 0) | ||||
{ | ||||
memcpy (msg_path, path, | ||||
path_length * sizeof (struct GNUNET_PeerIdentity)); | ||||
} | ||||
mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp); | ||||
memcpy (&mmsg->key, key, sizeof (GNUNET_HashCode)); | ||||
if (size > 0) | ||||
memcpy (&msg_path[path_length], data, size); | ||||
add_pending_message (m->client, pm); | add_pending_message (m->client, pm); | |||
} | } | |||
} | } | |||
GNUNET_free_non_null (cl); | GNUNET_free_non_null (cl); | |||
} | } | |||
/** | /** | |||
* Initialize client subsystem. | * Initialize client subsystem. | |||
* | * | |||
* @param server the initialized server | * @param server the initialized server | |||
skipping to change at line 1107 | skipping to change at line 1319 | |||
{ | { | |||
static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = { | static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = { | |||
{&handle_dht_local_put, NULL, | {&handle_dht_local_put, NULL, | |||
GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT, 0}, | GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT, 0}, | |||
{&handle_dht_local_get, NULL, | {&handle_dht_local_get, NULL, | |||
GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, 0}, | GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, 0}, | |||
{&handle_dht_local_get_stop, NULL, | {&handle_dht_local_get_stop, NULL, | |||
GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP, | GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP, | |||
sizeof (struct GNUNET_DHT_ClientGetStopMessage)}, | sizeof (struct GNUNET_DHT_ClientGetStopMessage)}, | |||
{&handle_dht_local_monitor, NULL, | {&handle_dht_local_monitor, NULL, | |||
GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET, | GNUNET_MESSAGE_TYPE_DHT_MONITOR_START, | |||
sizeof (struct GNUNET_DHT_MonitorMessage)}, | sizeof (struct GNUNET_DHT_MonitorStartStopMessage)}, | |||
{&handle_dht_local_monitor_stop, NULL, | ||||
GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP, | ||||
sizeof (struct GNUNET_DHT_MonitorStartStopMessage)}, | ||||
{NULL, NULL, 0, 0} | {NULL, NULL, 0, 0} | |||
}; | }; | |||
forward_map = GNUNET_CONTAINER_multihashmap_create (1024); | forward_map = GNUNET_CONTAINER_multihashmap_create (1024); | |||
retry_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MI N); | retry_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MI N); | |||
GNUNET_SERVER_add_handlers (server, plugin_handlers); | GNUNET_SERVER_add_handlers (server, plugin_handlers); | |||
GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL) ; | GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL) ; | |||
} | } | |||
/** | /** | |||
* Shutdown client subsystem. | * Shutdown client subsystem. | |||
skipping to change at line 1130 | skipping to change at line 1345 | |||
void | void | |||
GDS_CLIENTS_done () | GDS_CLIENTS_done () | |||
{ | { | |||
GNUNET_assert (client_head == NULL); | GNUNET_assert (client_head == NULL); | |||
GNUNET_assert (client_tail == NULL); | GNUNET_assert (client_tail == NULL); | |||
if (GNUNET_SCHEDULER_NO_TASK != retry_task) | if (GNUNET_SCHEDULER_NO_TASK != retry_task) | |||
{ | { | |||
GNUNET_SCHEDULER_cancel (retry_task); | GNUNET_SCHEDULER_cancel (retry_task); | |||
retry_task = GNUNET_SCHEDULER_NO_TASK; | retry_task = GNUNET_SCHEDULER_NO_TASK; | |||
} | } | |||
GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap)); | if (NULL != retry_heap) | |||
GNUNET_CONTAINER_heap_destroy (retry_heap); | { | |||
retry_heap = NULL; | GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap)); | |||
GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (forward_map)); | GNUNET_CONTAINER_heap_destroy (retry_heap); | |||
GNUNET_CONTAINER_multihashmap_destroy (forward_map); | retry_heap = NULL; | |||
forward_map = NULL; | } | |||
if (NULL != forward_map) | ||||
{ | ||||
GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (forward_map)); | ||||
GNUNET_CONTAINER_multihashmap_destroy (forward_map); | ||||
forward_map = NULL; | ||||
} | ||||
} | } | |||
/* end of gnunet-service-dht_clients.c */ | /* end of gnunet-service-dht_clients.c */ | |||
End of changes. 61 change blocks. | ||||
108 lines changed or deleted | 333 lines changed or added | |||
This html diff was produced by rfcdiff 1.41. The latest version is available from http://tools.ietf.org/tools/rfcdiff/ |