gnunet-service-dht_clients.c   gnunet-service-dht_clients.c 
skipping to change at line 88 skipping to change at line 88
/** /**
* The handle to this client * The handle to this client
*/ */
struct GNUNET_SERVER_Client *client_handle; struct GNUNET_SERVER_Client *client_handle;
/** /**
* Handle to the current transmission request, NULL * Handle to the current transmission request, NULL
* if none pending. * if none pending.
*/ */
struct GNUNET_CONNECTION_TransmitHandle *transmit_handle; struct GNUNET_SERVER_TransmitHandle *transmit_handle;
/** /**
* Linked list of pending messages for this client * Linked list of pending messages for this client
*/ */
struct PendingMessage *pending_head; struct PendingMessage *pending_head;
/** /**
* Tail of linked list of pending messages for this client * Tail of linked list of pending messages for this client
*/ */
struct PendingMessage *pending_tail; struct PendingMessage *pending_tail;
skipping to change at line 203 skipping to change at line 203
* Type of blocks that are of interest * Type of blocks that are of interest
*/ */
enum GNUNET_BLOCK_Type type; enum GNUNET_BLOCK_Type type;
/** /**
* Key of data of interest, NULL for all. * Key of data of interest, NULL for all.
*/ */
GNUNET_HashCode *key; GNUNET_HashCode *key;
/** /**
* Flag whether to notify about GET messages.
*/
int16_t get;
/**
* Flag whether to notify about GET_REPONSE messages.
*/
int16_t get_resp;
/**
* Flag whether to notify about PUT messages.
*/
uint16_t put;
/**
* Client to notify of these requests. * Client to notify of these requests.
*/ */
struct ClientList *client; struct ClientList *client;
}; };
/** /**
* List of active clients. * List of active clients.
*/ */
static struct ClientList *client_head; static struct ClientList *client_head;
skipping to change at line 244 skipping to change at line 259
* Heap with all of our client's request, sorted by retry time (earliest on top). * Heap with all of our client's request, sorted by retry time (earliest on top).
*/ */
static struct GNUNET_CONTAINER_Heap *retry_heap; static struct GNUNET_CONTAINER_Heap *retry_heap;
/** /**
* Task that re-transmits requests (using retry_heap). * Task that re-transmits requests (using retry_heap).
*/ */
static GNUNET_SCHEDULER_TaskIdentifier retry_task; static GNUNET_SCHEDULER_TaskIdentifier retry_task;
/** /**
* Task run to check for messages that need to be sent to a client.
*
* @param client a ClientList, containing the client and any messages to be
sent to it
*/
static void
process_pending_messages (struct ClientList *client);
/**
* Add a PendingMessage to the clients list of messages to be sent
*
* @param client the active client to send the message to
* @param pending_message the actual message to send
*/
static void
add_pending_message (struct ClientList *client,
struct PendingMessage *pending_message)
{
GNUNET_CONTAINER_DLL_insert_tail (client->pending_head, client->pending_t
ail,
pending_message);
process_pending_messages (client);
}
/**
* Find a client if it exists, add it otherwise. * Find a client if it exists, add it otherwise.
* *
* @param client the server handle to the client * @param client the server handle to the client
* *
* @return the client if found, a new client otherwise * @return the client if found, a new client otherwise
*/ */
static struct ClientList * static struct ClientList *
find_active_client (struct GNUNET_SERVER_Client *client) find_active_client (struct GNUNET_SERVER_Client *client)
{ {
struct ClientList *pos = client_head; struct ClientList *pos = client_head;
skipping to change at line 285 skipping to change at line 323
* @return GNUNET_YES (we should continue to iterate) * @return GNUNET_YES (we should continue to iterate)
*/ */
static int static int
remove_client_records (void *cls, const GNUNET_HashCode * key, void *value) remove_client_records (void *cls, const GNUNET_HashCode * key, void *value)
{ {
struct ClientList *client = cls; struct ClientList *client = cls;
struct ClientQueryRecord *record = value; struct ClientQueryRecord *record = value;
if (record->client != client) if (record->client != client)
return GNUNET_YES; return GNUNET_YES;
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Removing client %p's record for key %s\n", client, "Removing client %p's record for key %s\n", client,
GNUNET_h2s (key)); GNUNET_h2s (key));
#endif
GNUNET_assert (GNUNET_YES == GNUNET_assert (GNUNET_YES ==
GNUNET_CONTAINER_multihashmap_remove (forward_map, key, GNUNET_CONTAINER_multihashmap_remove (forward_map, key,
record)); record));
if (NULL != record->hnode) if (NULL != record->hnode)
GNUNET_CONTAINER_heap_remove_node (record->hnode); GNUNET_CONTAINER_heap_remove_node (record->hnode);
GNUNET_array_grow (record->seen_replies, record->seen_replies_count, 0); GNUNET_array_grow (record->seen_replies, record->seen_replies_count, 0);
GNUNET_free (record); GNUNET_free (record);
return GNUNET_YES; return GNUNET_YES;
} }
skipping to change at line 315 skipping to change at line 351
* @param client identification of the client; NULL * @param client identification of the client; NULL
* for the last call when the server is destroyed * for the last call when the server is destroyed
*/ */
static void static void
handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client) handle_client_disconnect (void *cls, struct GNUNET_SERVER_Client *client)
{ {
struct ClientList *pos; struct ClientList *pos;
struct PendingMessage *reply; struct PendingMessage *reply;
struct ClientMonitorRecord *monitor; struct ClientMonitorRecord *monitor;
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Local client %p disconnects\n", cli ent); GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Local client %p disconnects\n", cli ent);
#endif
pos = find_active_client (client); pos = find_active_client (client);
GNUNET_CONTAINER_DLL_remove (client_head, client_tail, pos); GNUNET_CONTAINER_DLL_remove (client_head, client_tail, pos);
if (pos->transmit_handle != NULL) if (pos->transmit_handle != NULL)
GNUNET_CONNECTION_notify_transmit_ready_cancel (pos->transmit_handle); GNUNET_SERVER_notify_transmit_ready_cancel (pos->transmit_handle);
while (NULL != (reply = pos->pending_head)) while (NULL != (reply = pos->pending_head))
{ {
GNUNET_CONTAINER_DLL_remove (pos->pending_head, pos->pending_tail, repl y); GNUNET_CONTAINER_DLL_remove (pos->pending_head, pos->pending_tail, repl y);
GNUNET_free (reply); GNUNET_free (reply);
} }
monitor = monitor_head; monitor = monitor_head;
while (NULL != monitor) while (NULL != monitor)
{ {
if (monitor->client == pos) if (monitor->client == pos)
{ {
skipping to change at line 441 skipping to change at line 475
* @param client the client we received this message from * @param client the client we received this message from
* @param message the actual message received * @param message the actual message received
*/ */
static void static void
handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client, handle_dht_local_put (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *message) const struct GNUNET_MessageHeader *message)
{ {
const struct GNUNET_DHT_ClientPutMessage *dht_msg; const struct GNUNET_DHT_ClientPutMessage *dht_msg;
struct GNUNET_CONTAINER_BloomFilter *peer_bf; struct GNUNET_CONTAINER_BloomFilter *peer_bf;
uint16_t size; uint16_t size;
struct PendingMessage *pm;
struct GNUNET_DHT_ClientPutConfirmationMessage *conf;
size = ntohs (message->size); size = ntohs (message->size);
if (size < sizeof (struct GNUNET_DHT_ClientPutMessage)) if (size < sizeof (struct GNUNET_DHT_ClientPutMessage))
{ {
GNUNET_break (0); GNUNET_break (0);
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return; return;
} }
GNUNET_STATISTICS_update (GDS_stats, GNUNET_STATISTICS_update (GDS_stats,
gettext_noop gettext_noop
("# PUT requests received from clients"), 1, ("# PUT requests received from clients"), 1,
GNUNET_NO); GNUNET_NO);
dht_msg = (const struct GNUNET_DHT_ClientPutMessage *) message; dht_msg = (const struct GNUNET_DHT_ClientPutMessage *) message;
/* give to local clients */ /* give to local clients */
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Handling local PUT of %u-bytes for query %s\n", "Handling local PUT of %u-bytes for query %s\n",
size - sizeof (struct GNUNET_DHT_ClientPutMessage), size - sizeof (struct GNUNET_DHT_ClientPutMessage),
GNUNET_h2s (&dht_msg->key)); GNUNET_h2s (&dht_msg->key));
#endif
GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration) , GDS_CLIENTS_handle_reply (GNUNET_TIME_absolute_ntoh (dht_msg->expiration) ,
&dht_msg->key, 0, NULL, 0, NULL, &dht_msg->key, 0, NULL, 0, NULL,
ntohl (dht_msg->type), ntohl (dht_msg->type),
size - sizeof (struct GNUNET_DHT_ClientPutMessa ge), size - sizeof (struct GNUNET_DHT_ClientPutMessa ge),
&dht_msg[1]); &dht_msg[1]);
/* store locally */ /* store locally */
GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (dht_msg->expiration) , GDS_DATACACHE_handle_put (GNUNET_TIME_absolute_ntoh (dht_msg->expiration) ,
&dht_msg->key, 0, NULL, ntohl (dht_msg->type), &dht_msg->key, 0, NULL, ntohl (dht_msg->type),
size - sizeof (struct GNUNET_DHT_ClientPutMessa ge), size - sizeof (struct GNUNET_DHT_ClientPutMessa ge),
&dht_msg[1]); &dht_msg[1]);
skipping to change at line 482 skipping to change at line 516
peer_bf = peer_bf =
GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE, GNUNET_CONTAINER_bloomfilter_init (NULL, DHT_BLOOM_SIZE,
GNUNET_CONSTANTS_BLOOMFILTER_K); GNUNET_CONSTANTS_BLOOMFILTER_K);
GDS_NEIGHBOURS_handle_put (ntohl (dht_msg->type), ntohl (dht_msg->options ), GDS_NEIGHBOURS_handle_put (ntohl (dht_msg->type), ntohl (dht_msg->options ),
ntohl (dht_msg->desired_replication_level), ntohl (dht_msg->desired_replication_level),
GNUNET_TIME_absolute_ntoh (dht_msg->expiration ), GNUNET_TIME_absolute_ntoh (dht_msg->expiration ),
0 /* hop count */ , 0 /* hop count */ ,
peer_bf, &dht_msg->key, 0, NULL, &dht_msg[1], peer_bf, &dht_msg->key, 0, NULL, &dht_msg[1],
size - size -
sizeof (struct GNUNET_DHT_ClientPutMessage)); sizeof (struct GNUNET_DHT_ClientPutMessage));
GDS_CLIENTS_process_put (ntohl (dht_msg->options),
ntohl (dht_msg->type),
0,
ntohl (dht_msg->desired_replication_level),
1,
GDS_NEIGHBOURS_get_id(),
GNUNET_TIME_absolute_ntoh (dht_msg->expiration),
&dht_msg->key,
&dht_msg[1],
size - sizeof (struct GNUNET_DHT_ClientPutMessag
e));
GNUNET_CONTAINER_bloomfilter_free (peer_bf); GNUNET_CONTAINER_bloomfilter_free (peer_bf);
pm = GNUNET_malloc (sizeof (struct PendingMessage) +
sizeof (struct GNUNET_DHT_ClientPutConfirmationMessage
));
conf = (struct GNUNET_DHT_ClientPutConfirmationMessage *) &pm[1];
conf->header.size = htons (sizeof (struct GNUNET_DHT_ClientPutConfirmatio
nMessage));
conf->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT_OK);
conf->reserved = htonl (0);
conf->unique_id = dht_msg->unique_id;
pm->msg = &conf->header;
add_pending_message (find_active_client (client), pm);
GNUNET_SERVER_receive_done (client, GNUNET_OK); GNUNET_SERVER_receive_done (client, GNUNET_OK);
} }
/** /**
* Handler for any generic DHT messages, calls the appropriate handler * Handler for any generic DHT messages, calls the appropriate handler
* depending on message type, sends confirmation if responses aren't otherw ise * depending on message type, sends confirmation if responses aren't otherw ise
* expected. * expected.
* *
* @param cls closure for the service * @param cls closure for the service
* @param client the client we received this message from * @param client the client we received this message from
skipping to change at line 519 skipping to change at line 572
GNUNET_SERVER_receive_done (client, GNUNET_SYSERR); GNUNET_SERVER_receive_done (client, GNUNET_SYSERR);
return; return;
} }
xquery_size = size - sizeof (struct GNUNET_DHT_ClientGetMessage); xquery_size = size - sizeof (struct GNUNET_DHT_ClientGetMessage);
get = (const struct GNUNET_DHT_ClientGetMessage *) message; get = (const struct GNUNET_DHT_ClientGetMessage *) message;
xquery = (const char *) &get[1]; xquery = (const char *) &get[1];
GNUNET_STATISTICS_update (GDS_stats, GNUNET_STATISTICS_update (GDS_stats,
gettext_noop gettext_noop
("# GET requests received from clients"), 1, ("# GET requests received from clients"), 1,
GNUNET_NO); GNUNET_NO);
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Received request for %s from local client %p\n", "Received request for %s from local client %p\n",
GNUNET_h2s (&get->key), client); GNUNET_h2s (&get->key), client);
#endif
cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size); cqr = GNUNET_malloc (sizeof (struct ClientQueryRecord) + xquery_size);
cqr->key = get->key; cqr->key = get->key;
cqr->client = find_active_client (client); cqr->client = find_active_client (client);
cqr->xquery = (void *) &cqr[1]; cqr->xquery = (void *) &cqr[1];
memcpy (&cqr[1], xquery, xquery_size); memcpy (&cqr[1], xquery, xquery_size);
cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr, 0); cqr->hnode = GNUNET_CONTAINER_heap_insert (retry_heap, cqr, 0);
cqr->retry_frequency = GNUNET_TIME_UNIT_MILLISECONDS; cqr->retry_frequency = GNUNET_TIME_UNIT_MILLISECONDS;
cqr->retry_time = GNUNET_TIME_absolute_get (); cqr->retry_time = GNUNET_TIME_absolute_get ();
cqr->unique_id = get->unique_id; cqr->unique_id = get->unique_id;
cqr->xquery_size = xquery_size; cqr->xquery_size = xquery_size;
cqr->replication = ntohl (get->desired_replication_level); cqr->replication = ntohl (get->desired_replication_level);
cqr->msg_options = ntohl (get->options); cqr->msg_options = ntohl (get->options);
cqr->type = ntohl (get->type); cqr->type = ntohl (get->type);
GNUNET_CONTAINER_multihashmap_put (forward_map, &get->key, cqr, GNUNET_CONTAINER_multihashmap_put (forward_map, &get->key, cqr,
GNUNET_CONTAINER_MULTIHASHMAPOPTION_MU LTIPLE); GNUNET_CONTAINER_MULTIHASHMAPOPTION_MU LTIPLE);
GDS_CLIENTS_process_get (ntohl (get->options),
ntohl (get->type),
0,
ntohl (get->desired_replication_level),
1,
GDS_NEIGHBOURS_get_id(),
&get->key);
/* start remote requests */ /* start remote requests */
if (GNUNET_SCHEDULER_NO_TASK != retry_task) if (GNUNET_SCHEDULER_NO_TASK != retry_task)
GNUNET_SCHEDULER_cancel (retry_task); GNUNET_SCHEDULER_cancel (retry_task);
retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task, NULL) ; retry_task = GNUNET_SCHEDULER_add_now (&transmit_next_request_task, NULL) ;
/* perform local lookup */ /* perform local lookup */
GDS_DATACACHE_handle_get (&get->key, cqr->type, cqr->xquery, xquery_size, GDS_DATACACHE_handle_get (&get->key, cqr->type, cqr->xquery, xquery_size,
NULL, 0); NULL, 0);
GNUNET_SERVER_receive_done (client, GNUNET_OK); GNUNET_SERVER_receive_done (client, GNUNET_OK);
} }
skipping to change at line 582 skipping to change at line 640
* @return GNUNET_YES (we should continue to iterate) * @return GNUNET_YES (we should continue to iterate)
*/ */
static int static int
remove_by_unique_id (void *cls, const GNUNET_HashCode * key, void *value) remove_by_unique_id (void *cls, const GNUNET_HashCode * key, void *value)
{ {
const struct RemoveByUniqueIdContext *ctx = cls; const struct RemoveByUniqueIdContext *ctx = cls;
struct ClientQueryRecord *record = value; struct ClientQueryRecord *record = value;
if (record->unique_id != ctx->unique_id) if (record->unique_id != ctx->unique_id)
return GNUNET_YES; return GNUNET_YES;
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Removing client %p's record for key %s (by unique id)\n", "Removing client %p's record for key %s (by unique id)\n",
ctx->client->client_handle, GNUNET_h2s (key)); ctx->client->client_handle, GNUNET_h2s (key));
#endif
return remove_client_records (ctx->client, key, record); return remove_client_records (ctx->client, key, record);
} }
/** /**
* Handler for any generic DHT stop messages, calls the appropriate handler * Handler for any generic DHT stop messages, calls the appropriate handler
* depending on message type (if processed locally) * depending on message type (if processed locally)
* *
* @param cls closure for the service * @param cls closure for the service
* @param client the client we received this message from * @param client the client we received this message from
* @param message the actual message received * @param message the actual message received
skipping to change at line 611 skipping to change at line 667
const struct GNUNET_MessageHeader *message) const struct GNUNET_MessageHeader *message)
{ {
const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg = const struct GNUNET_DHT_ClientGetStopMessage *dht_stop_msg =
(const struct GNUNET_DHT_ClientGetStopMessage *) message; (const struct GNUNET_DHT_ClientGetStopMessage *) message;
struct RemoveByUniqueIdContext ctx; struct RemoveByUniqueIdContext ctx;
GNUNET_STATISTICS_update (GDS_stats, GNUNET_STATISTICS_update (GDS_stats,
gettext_noop gettext_noop
("# GET STOP requests received from clients"), 1, ("# GET STOP requests received from clients"), 1,
GNUNET_NO); GNUNET_NO);
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p stopped request for key % s\n", GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Client %p stopped request for key % s\n",
client, GNUNET_h2s (&dht_stop_msg->key)); client, GNUNET_h2s (&dht_stop_msg->key));
#endif
ctx.client = find_active_client (client); ctx.client = find_active_client (client);
ctx.unique_id = dht_stop_msg->unique_id; ctx.unique_id = dht_stop_msg->unique_id;
GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, &dht_stop_msg->k ey, GNUNET_CONTAINER_multihashmap_get_multiple (forward_map, &dht_stop_msg->k ey,
&remove_by_unique_id, &ctx); &remove_by_unique_id, &ctx);
GNUNET_SERVER_receive_done (client, GNUNET_OK); GNUNET_SERVER_receive_done (client, GNUNET_OK);
} }
/** /**
* Handler for monitor messages * Handler for monitor start messages
* *
* @param cls closure for the service * @param cls closure for the service
* @param client the client we received this message from * @param client the client we received this message from
* @param message the actual message received * @param message the actual message received
* *
*/ */
static void static void
handle_dht_local_monitor (void *cls, struct GNUNET_SERVER_Client *client, handle_dht_local_monitor (void *cls, struct GNUNET_SERVER_Client *client,
const struct GNUNET_MessageHeader *message) const struct GNUNET_MessageHeader *message)
{ {
struct ClientMonitorRecord *r; struct ClientMonitorRecord *r;
const struct GNUNET_DHT_MonitorMessage *msg; const struct GNUNET_DHT_MonitorStartStopMessage *msg;
unsigned int i;
char *c;
msg = (struct GNUNET_DHT_MonitorMessage *) message; msg = (struct GNUNET_DHT_MonitorStartStopMessage *) message;
r = GNUNET_malloc (sizeof(struct ClientMonitorRecord)); r = GNUNET_malloc (sizeof(struct ClientMonitorRecord));
r->client = find_active_client(client); r->client = find_active_client(client);
r->type = ntohl(msg->type); r->type = ntohl(msg->type);
c = (char *) &msg->key; r->get = ntohs(msg->get);
for (i = 0; i < sizeof (GNUNET_HashCode) && c[i] == 0; i++); r->get_resp = ntohs(msg->get_resp);
if (sizeof (GNUNET_HashCode) == i) r->put = ntohs(msg->put);
r->key = NULL; if (0 == ntohs(msg->filter_key))
r->key = NULL;
else else
{ {
r->key = GNUNET_malloc (sizeof (GNUNET_HashCode)); r->key = GNUNET_malloc (sizeof (GNUNET_HashCode));
memcpy (r->key, &msg->key, sizeof (GNUNET_HashCode)); memcpy (r->key, &msg->key, sizeof (GNUNET_HashCode));
} }
GNUNET_CONTAINER_DLL_insert (monitor_head, monitor_tail, r); GNUNET_CONTAINER_DLL_insert (monitor_head, monitor_tail, r);
// FIXME add remove somewhere
GNUNET_SERVER_receive_done (client, GNUNET_OK); GNUNET_SERVER_receive_done (client, GNUNET_OK);
} }
/** /**
* Task run to check for messages that need to be sent to a client. * Handler for monitor stop messages
*
* @param cls closure for the service
* @param client the client we received this message from
* @param message the actual message received
* *
* @param client a ClientList, containing the client and any messages to be sent to it
*/ */
static void static void
process_pending_messages (struct ClientList *client); handle_dht_local_monitor_stop (void *cls, struct GNUNET_SERVER_Client *clie
nt,
const struct GNUNET_MessageHeader *message)
{
struct ClientMonitorRecord *r;
const struct GNUNET_DHT_MonitorStartStopMessage *msg;
int keys_match;
msg = (struct GNUNET_DHT_MonitorStartStopMessage *) message;
r = monitor_head;
while (NULL != r)
{
if (NULL == r->key)
keys_match = (0 == ntohs(msg->filter_key));
else
{
keys_match = (0 != ntohs(msg->filter_key)
&& !memcmp(r->key, &msg->key, sizeof(GNUNET_HashCode)
));
}
if (find_active_client(client) == r->client
&& ntohl(msg->type) == r->type
&& r->get == msg->get
&& r->get_resp == msg->get_resp
&& r->put == msg->put
&& keys_match
)
{
GNUNET_CONTAINER_DLL_remove (monitor_head, monitor_tail, r);
GNUNET_free_non_null (r->key);
GNUNET_free (r);
GNUNET_SERVER_receive_done (client, GNUNET_OK);
return; /* Delete only ONE entry */
}
r = r->next;
}
GNUNET_SERVER_receive_done (client, GNUNET_OK);
}
/** /**
* Callback called as a result of issuing a GNUNET_SERVER_notify_transmit_r eady * Callback called as a result of issuing a GNUNET_SERVER_notify_transmit_r eady
* request. A ClientList is passed as closure, take the head of the list * request. A ClientList is passed as closure, take the head of the list
* and copy it into buf, which has the result of sending the message to the * and copy it into buf, which has the result of sending the message to the
* client. * client.
* *
* @param cls closure to this call * @param cls closure to this call
* @param size maximum number of bytes available to send * @param size maximum number of bytes available to send
* @param buf where to copy the actual message to * @param buf where to copy the actual message to
skipping to change at line 691 skipping to change at line 783
struct ClientList *client = cls; struct ClientList *client = cls;
char *cbuf = buf; char *cbuf = buf;
struct PendingMessage *reply; struct PendingMessage *reply;
size_t off; size_t off;
size_t msize; size_t msize;
client->transmit_handle = NULL; client->transmit_handle = NULL;
if (buf == NULL) if (buf == NULL)
{ {
/* client disconnected */ /* client disconnected */
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Client %p disconnected, pending messages will be discarded \n", "Client %p disconnected, pending messages will be discarded \n",
client->client_handle); client->client_handle);
#endif
return 0; return 0;
} }
off = 0; off = 0;
while ((NULL != (reply = client->pending_head)) && while ((NULL != (reply = client->pending_head)) &&
(size >= off + (msize = ntohs (reply->msg->size)))) (size >= off + (msize = ntohs (reply->msg->size))))
{ {
GNUNET_CONTAINER_DLL_remove (client->pending_head, client->pending_tail , GNUNET_CONTAINER_DLL_remove (client->pending_head, client->pending_tail ,
reply); reply);
memcpy (&cbuf[off], reply->msg, msize); memcpy (&cbuf[off], reply->msg, msize);
GNUNET_free (reply); GNUNET_free (reply);
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u bytes to client % p\n", GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitting %u bytes to client % p\n",
msize, client->client_handle); msize, client->client_handle);
#endif
off += msize; off += msize;
} }
process_pending_messages (client); process_pending_messages (client);
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitted %u/%u bytes to client % p\n", GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, "Transmitted %u/%u bytes to client % p\n",
(unsigned int) off, (unsigned int) size, client->client_handl e); (unsigned int) off, (unsigned int) size, client->client_handl e);
#endif
return off; return off;
} }
/** /**
* Task run to check for messages that need to be sent to a client. * Task run to check for messages that need to be sent to a client.
* *
* @param client a ClientList, containing the client and any messages to be sent to it * @param client a ClientList, containing the client and any messages to be sent to it
*/ */
static void static void
process_pending_messages (struct ClientList *client) process_pending_messages (struct ClientList *client)
{ {
if ((client->pending_head == NULL) || (client->transmit_handle != NULL)) if ((client->pending_head == NULL) || (client->transmit_handle != NULL))
{ {
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Not asking for transmission to %p now: %s\n", "Not asking for transmission to %p now: %s\n",
client->client_handle, client->client_handle,
client->pending_head == client->pending_head ==
NULL ? "no more messages" : "request already pending"); NULL ? "no more messages" : "request already pending");
#endif
return; return;
} }
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Asking for transmission of %u bytes to client %p\n", "Asking for transmission of %u bytes to client %p\n",
ntohs (client->pending_head->msg->size), client->client_handl e); ntohs (client->pending_head->msg->size), client->client_handl e);
#endif
client->transmit_handle = client->transmit_handle =
GNUNET_SERVER_notify_transmit_ready (client->client_handle, GNUNET_SERVER_notify_transmit_ready (client->client_handle,
ntohs (client->pending_head-> ntohs (client->pending_head->
msg->size), msg->size),
GNUNET_TIME_UNIT_FOREVER_REL, GNUNET_TIME_UNIT_FOREVER_REL,
&send_reply_to_client, client); &send_reply_to_client, client);
} }
/** /**
* Add a PendingMessage to the clients list of messages to be sent
*
* @param client the active client to send the message to
* @param pending_message the actual message to send
*/
static void
add_pending_message (struct ClientList *client,
struct PendingMessage *pending_message)
{
GNUNET_CONTAINER_DLL_insert_tail (client->pending_head, client->pending_t
ail,
pending_message);
process_pending_messages (client);
}
/**
* Closure for 'forward_reply' * Closure for 'forward_reply'
*/ */
struct ForwardReplyContext struct ForwardReplyContext
{ {
/** /**
* Actual message to send to matching clients. * Actual message to send to matching clients.
*/ */
struct PendingMessage *pm; struct PendingMessage *pm;
skipping to change at line 825 skipping to change at line 892
struct ClientQueryRecord *record = value; struct ClientQueryRecord *record = value;
struct PendingMessage *pm; struct PendingMessage *pm;
struct GNUNET_DHT_ClientResultMessage *reply; struct GNUNET_DHT_ClientResultMessage *reply;
enum GNUNET_BLOCK_EvaluationResult eval; enum GNUNET_BLOCK_EvaluationResult eval;
int do_free; int do_free;
GNUNET_HashCode ch; GNUNET_HashCode ch;
unsigned int i; unsigned int i;
if ((record->type != GNUNET_BLOCK_TYPE_ANY) && (record->type != frc->type )) if ((record->type != GNUNET_BLOCK_TYPE_ANY) && (record->type != frc->type ))
{ {
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Record type missmatch, not passing request for key %s to l ocal client\n", "Record type missmatch, not passing request for key %s to l ocal client\n",
GNUNET_h2s (key)); GNUNET_h2s (key));
#endif
GNUNET_STATISTICS_update (GDS_stats, GNUNET_STATISTICS_update (GDS_stats,
gettext_noop gettext_noop
("# Key match, type mismatches in REPLY to CL IENT"), ("# Key match, type mismatches in REPLY to CL IENT"),
1, GNUNET_NO); 1, GNUNET_NO);
return GNUNET_YES; /* type mismatch */ return GNUNET_YES; /* type mismatch */
} }
GNUNET_CRYPTO_hash (frc->data, frc->data_size, &ch); GNUNET_CRYPTO_hash (frc->data, frc->data_size, &ch);
for (i = 0; i < record->seen_replies_count; i++) for (i = 0; i < record->seen_replies_count; i++)
if (0 == memcmp (&record->seen_replies[i], &ch, sizeof (GNUNET_HashCode ))) if (0 == memcmp (&record->seen_replies[i], &ch, sizeof (GNUNET_HashCode )))
{ {
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Duplicate reply, not passing request for key %s to local client\n", "Duplicate reply, not passing request for key %s to local client\n",
GNUNET_h2s (key)); GNUNET_h2s (key));
#endif
GNUNET_STATISTICS_update (GDS_stats, GNUNET_STATISTICS_update (GDS_stats,
gettext_noop gettext_noop
("# Duplicate REPLIES to CLIENT request dro pped"), ("# Duplicate REPLIES to CLIENT request dro pped"),
1, GNUNET_NO); 1, GNUNET_NO);
return GNUNET_YES; /* duplicate */ return GNUNET_YES; /* duplicate */
} }
eval = eval =
GNUNET_BLOCK_evaluate (GDS_block_context, record->type, key, NULL, 0, GNUNET_BLOCK_evaluate (GDS_block_context, record->type, key, NULL, 0,
record->xquery, record->xquery_size, frc->data , record->xquery, record->xquery_size, frc->data ,
frc->data_size); frc->data_size);
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Evaluation result is %d for key %s for local client's query\ n", "Evaluation result is %d for key %s for local client's query\ n",
(int) eval, GNUNET_h2s (key)); (int) eval, GNUNET_h2s (key));
#endif
switch (eval) switch (eval)
{ {
case GNUNET_BLOCK_EVALUATION_OK_LAST: case GNUNET_BLOCK_EVALUATION_OK_LAST:
do_free = GNUNET_YES; do_free = GNUNET_YES;
break; break;
case GNUNET_BLOCK_EVALUATION_OK_MORE: case GNUNET_BLOCK_EVALUATION_OK_MORE:
GNUNET_array_append (record->seen_replies, record->seen_replies_count, ch); GNUNET_array_append (record->seen_replies, record->seen_replies_count, ch);
do_free = GNUNET_NO; do_free = GNUNET_NO;
break; break;
case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE: case GNUNET_BLOCK_EVALUATION_OK_DUPLICATE:
skipping to change at line 910 skipping to change at line 971
ntohs (frc->pm->msg->size)); ntohs (frc->pm->msg->size));
memcpy (pm, frc->pm, memcpy (pm, frc->pm,
sizeof (struct PendingMessage) + ntohs (frc->pm->msg->size)); sizeof (struct PendingMessage) + ntohs (frc->pm->msg->size));
pm->next = pm->prev = NULL; pm->next = pm->prev = NULL;
} }
GNUNET_STATISTICS_update (GDS_stats, GNUNET_STATISTICS_update (GDS_stats,
gettext_noop ("# RESULTS queued for clients"), 1, gettext_noop ("# RESULTS queued for clients"), 1,
GNUNET_NO); GNUNET_NO);
reply = (struct GNUNET_DHT_ClientResultMessage *) &pm[1]; reply = (struct GNUNET_DHT_ClientResultMessage *) &pm[1];
reply->unique_id = record->unique_id; reply->unique_id = record->unique_id;
#if DEBUG_DHT
GNUNET_log (GNUNET_ERROR_TYPE_DEBUG, GNUNET_log (GNUNET_ERROR_TYPE_DEBUG,
"Queueing reply to query %s for client %p\n", GNUNET_h2s (key ), "Queueing reply to query %s for client %p\n", GNUNET_h2s (key ),
record->client->client_handle); record->client->client_handle);
#endif
add_pending_message (record->client, pm); add_pending_message (record->client, pm);
if (GNUNET_YES == do_free) if (GNUNET_YES == do_free)
remove_client_records (record->client, key, record); remove_client_records (record->client, key, record);
return GNUNET_YES; return GNUNET_YES;
} }
/** /**
* Handle a reply we've received from another peer. If the reply * Handle a reply we've received from another peer. If the reply
* matches any of our pending queries, forward it to the respective * matches any of our pending queries, forward it to the respective
* client(s). * client(s).
skipping to change at line 1006 skipping to change at line 1065
/* did not match any of the requests, free! */ /* did not match any of the requests, free! */
GNUNET_STATISTICS_update (GDS_stats, GNUNET_STATISTICS_update (GDS_stats,
gettext_noop gettext_noop
("# REPLIES ignored for CLIENTS (no match)"), 1, ("# REPLIES ignored for CLIENTS (no match)"), 1,
GNUNET_NO); GNUNET_NO);
GNUNET_free (pm); GNUNET_free (pm);
} }
} }
/** /**
* Check if some client is monitoring messages of this type and notify * Check if some client is monitoring GET messages and notify
* him in that case. * them in that case.
* *
* @param mtype Type of the DHT message. * @param options Options, for instance RecordRoute, DemultiplexEverywhere.
* @param exp When will this value expire. * @param type The type of data in the request.
* @param key Key of the result/request. * @param hop_count Hop count so far.
* @param putl number of entries in get_path. * @param path_length number of entries in path (or 0 if not recorded).
* @param put_path peers on the PUT path (or NULL if not recorded). * @param path peers on the GET path (or NULL if not recorded).
* @param getl number of entries in get_path.
* @param get_path Peers on reply path (or NULL if not recorded).
* @param desired_replication_level Desired replication level. * @param desired_replication_level Desired replication level.
* @param type Type of the result/request. * @param key Key of the requested data.
*/
void
GDS_CLIENTS_process_get (uint32_t options,
enum GNUNET_BLOCK_Type type,
uint32_t hop_count,
uint32_t desired_replication_level,
unsigned int path_length,
const struct GNUNET_PeerIdentity *path,
const GNUNET_HashCode * key)
{
struct ClientMonitorRecord *m;
struct ClientList **cl;
unsigned int cl_size;
cl = NULL;
cl_size = 0;
for (m = monitor_head; NULL != m; m = m->next)
{
if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
(NULL == m->key ||
memcmp (key, m->key, sizeof(GNUNET_HashCode)) == 0))
{
struct PendingMessage *pm;
struct GNUNET_DHT_MonitorGetMessage *mmsg;
struct GNUNET_PeerIdentity *msg_path;
size_t msize;
unsigned int i;
/* Don't send duplicates */
for (i = 0; i < cl_size; i++)
if (cl[i] == m->client)
break;
if (i < cl_size)
continue;
GNUNET_array_append (cl, cl_size, m->client);
msize = path_length * sizeof (struct GNUNET_PeerIdentity);
msize += sizeof (struct GNUNET_DHT_MonitorGetMessage);
msize += sizeof (struct PendingMessage);
pm = (struct PendingMessage *) GNUNET_malloc (msize);
mmsg = (struct GNUNET_DHT_MonitorGetMessage *) &pm[1];
pm->msg = &mmsg->header;
mmsg->header.size = htons (msize - sizeof (struct PendingMessage));
mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET);
mmsg->options = htonl(options);
mmsg->type = htonl(type);
mmsg->hop_count = htonl(hop_count);
mmsg->desired_replication_level = htonl(desired_replication_level);
mmsg->get_path_length = htonl(path_length);
memcpy (&mmsg->key, key, sizeof (GNUNET_HashCode));
msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
if (path_length > 0)
memcpy (msg_path, path,
path_length * sizeof (struct GNUNET_PeerIdentity));
add_pending_message (m->client, pm);
}
}
GNUNET_free_non_null (cl);
}
/**
* Check if some client is monitoring GET RESP messages and notify
* them in that case.
*
* @param type The type of data in the result.
* @param get_path Peers on GET path (or NULL if not recorded).
* @param get_path_length number of entries in get_path.
* @param put_path peers on the PUT path (or NULL if not recorded).
* @param put_path_length number of entries in get_path.
* @param exp Expiration time of the data.
* @param key Key of the data.
* @param data Pointer to the result data. * @param data Pointer to the result data.
* @param size Number of bytes in data. * @param size Number of bytes in data.
*/ */
void void
GDS_CLIENTS_process_monitor (uint16_t mtype, GDS_CLIENTS_process_get_resp (enum GNUNET_BLOCK_Type type,
const struct GNUNET_TIME_Absolute exp, const struct GNUNET_PeerIdentity *get_path,
const GNUNET_HashCode *key, unsigned int get_path_length,
uint32_t putl, const struct GNUNET_PeerIdentity *put_path,
const struct GNUNET_PeerIdentity *put_path, unsigned int put_path_length,
uint32_t getl, struct GNUNET_TIME_Absolute exp,
const struct GNUNET_PeerIdentity *get_path, const GNUNET_HashCode * key,
uint32_t desired_replication_level, const void *data,
enum GNUNET_BLOCK_Type type, size_t size)
const struct GNUNET_MessageHeader *data,
uint16_t size)
{ {
struct ClientMonitorRecord *m; struct ClientMonitorRecord *m;
struct ClientList **cl; struct ClientList **cl;
unsigned int cl_size; unsigned int cl_size;
cl = NULL; cl = NULL;
cl_size = 0; cl_size = 0;
for (m = monitor_head; NULL != m; m = m->next) for (m = monitor_head; NULL != m; m = m->next)
{ {
if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) && if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
(NULL == m->key || (NULL == m->key ||
memcmp (key, m->key, sizeof(GNUNET_HashCode)) == 0)) memcmp (key, m->key, sizeof(GNUNET_HashCode)) == 0))
{ {
struct PendingMessage *pm; struct PendingMessage *pm;
struct GNUNET_DHT_MonitorMessage *mmsg; struct GNUNET_DHT_MonitorGetRespMessage *mmsg;
struct GNUNET_PeerIdentity *path; struct GNUNET_PeerIdentity *path;
size_t msize; size_t msize;
unsigned int i; unsigned int i;
/* Don't send duplicates */ /* Don't send duplicates */
for (i = 0; i < cl_size; i++) for (i = 0; i < cl_size; i++)
if (cl[i] == m->client) if (cl[i] == m->client)
break; break;
if (i < cl_size) if (i < cl_size)
continue; continue;
GNUNET_array_append (cl, cl_size, m->client); GNUNET_array_append (cl, cl_size, m->client);
msize = size; msize = size;
msize += (getl + putl) * sizeof (struct GNUNET_PeerIdentity); msize += (get_path_length + put_path_length)
msize += sizeof (struct GNUNET_DHT_MonitorMessage); * sizeof (struct GNUNET_PeerIdentity);
msize += sizeof (struct GNUNET_DHT_MonitorGetRespMessage);
msize += sizeof (struct PendingMessage); msize += sizeof (struct PendingMessage);
pm = (struct PendingMessage *) GNUNET_malloc (msize); pm = (struct PendingMessage *) GNUNET_malloc (msize);
mmsg = (struct GNUNET_DHT_MonitorMessage *) &pm[1]; mmsg = (struct GNUNET_DHT_MonitorGetRespMessage *) &pm[1];
pm->msg = (struct GNUNET_MessageHeader *) mmsg; pm->msg = (struct GNUNET_MessageHeader *) mmsg;
mmsg->header.size = htons (msize - sizeof (struct PendingMessage)); mmsg->header.size = htons (msize - sizeof (struct PendingMessage));
mmsg->header.type = htons (mtype); mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET_RESP);
mmsg->expiration = GNUNET_TIME_absolute_hton(exp); mmsg->type = htonl(type);
memcpy (&mmsg->key, key, sizeof (GNUNET_HashCode)); mmsg->put_path_length = htonl(put_path_length);
mmsg->put_path_length = htonl(putl); mmsg->get_path_length = htonl(get_path_length);
mmsg->get_path_length = htonl(getl);
mmsg->desired_replication_level = htonl (desired_replication_level);
path = (struct GNUNET_PeerIdentity *) &mmsg[1]; path = (struct GNUNET_PeerIdentity *) &mmsg[1];
if (putl > 0) if (put_path_length > 0)
{ {
memcpy (path, put_path, putl * sizeof (struct GNUNET_PeerIdentity)) memcpy (path, put_path,
; put_path_length * sizeof (struct GNUNET_PeerIdentity));
path = &path[putl]; path = &path[put_path_length];
} }
if (getl > 0) if (get_path_length > 0)
memcpy (path, get_path, getl * sizeof (struct GNUNET_PeerIdentity)) memcpy (path, get_path,
; get_path_length * sizeof (struct GNUNET_PeerIdentity));
mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp);
memcpy (&mmsg->key, key, sizeof (GNUNET_HashCode));
if (size > 0) if (size > 0)
memcpy (&path[getl], data, size); memcpy (&path[get_path_length], data, size);
add_pending_message (m->client, pm);
}
}
GNUNET_free_non_null (cl);
}
/**
* Check if some client is monitoring PUT messages and notify
* them in that case.
*
* @param options Options, for instance RecordRoute, DemultiplexEverywhere.
* @param type The type of data in the request.
* @param hop_count Hop count so far.
* @param path_length number of entries in path (or 0 if not recorded).
* @param path peers on the PUT path (or NULL if not recorded).
* @param desired_replication_level Desired replication level.
* @param exp Expiration time of the data.
* @param key Key under which data is to be stored.
* @param data Pointer to the data carried.
* @param size Number of bytes in data.
*/
void
GDS_CLIENTS_process_put (uint32_t options,
enum GNUNET_BLOCK_Type type,
uint32_t hop_count,
uint32_t desired_replication_level,
unsigned int path_length,
const struct GNUNET_PeerIdentity *path,
struct GNUNET_TIME_Absolute exp,
const GNUNET_HashCode * key,
const void *data,
size_t size)
{
struct ClientMonitorRecord *m;
struct ClientList **cl;
unsigned int cl_size;
cl = NULL;
cl_size = 0;
for (m = monitor_head; NULL != m; m = m->next)
{
if ((GNUNET_BLOCK_TYPE_ANY == m->type || m->type == type) &&
(NULL == m->key ||
memcmp (key, m->key, sizeof(GNUNET_HashCode)) == 0))
{
struct PendingMessage *pm;
struct GNUNET_DHT_MonitorPutMessage *mmsg;
struct GNUNET_PeerIdentity *msg_path;
size_t msize;
unsigned int i;
/* Don't send duplicates */
for (i = 0; i < cl_size; i++)
if (cl[i] == m->client)
break;
if (i < cl_size)
continue;
GNUNET_array_append (cl, cl_size, m->client);
msize = size;
msize += path_length * sizeof (struct GNUNET_PeerIdentity);
msize += sizeof (struct GNUNET_DHT_MonitorPutMessage);
msize += sizeof (struct PendingMessage);
pm = (struct PendingMessage *) GNUNET_malloc (msize);
mmsg = (struct GNUNET_DHT_MonitorPutMessage *) &pm[1];
pm->msg = (struct GNUNET_MessageHeader *) mmsg;
mmsg->header.size = htons (msize - sizeof (struct PendingMessage));
mmsg->header.type = htons (GNUNET_MESSAGE_TYPE_DHT_MONITOR_PUT);
mmsg->options = htonl(options);
mmsg->type = htonl(type);
mmsg->hop_count = htonl(hop_count);
mmsg->desired_replication_level = htonl(desired_replication_level);
mmsg->put_path_length = htonl(path_length);
msg_path = (struct GNUNET_PeerIdentity *) &mmsg[1];
if (path_length > 0)
{
memcpy (msg_path, path,
path_length * sizeof (struct GNUNET_PeerIdentity));
}
mmsg->expiration_time = GNUNET_TIME_absolute_hton(exp);
memcpy (&mmsg->key, key, sizeof (GNUNET_HashCode));
if (size > 0)
memcpy (&msg_path[path_length], data, size);
add_pending_message (m->client, pm); add_pending_message (m->client, pm);
} }
} }
GNUNET_free_non_null (cl); GNUNET_free_non_null (cl);
} }
/** /**
* Initialize client subsystem. * Initialize client subsystem.
* *
* @param server the initialized server * @param server the initialized server
skipping to change at line 1107 skipping to change at line 1319
{ {
static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = { static struct GNUNET_SERVER_MessageHandler plugin_handlers[] = {
{&handle_dht_local_put, NULL, {&handle_dht_local_put, NULL,
GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT, 0}, GNUNET_MESSAGE_TYPE_DHT_CLIENT_PUT, 0},
{&handle_dht_local_get, NULL, {&handle_dht_local_get, NULL,
GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, 0}, GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET, 0},
{&handle_dht_local_get_stop, NULL, {&handle_dht_local_get_stop, NULL,
GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP, GNUNET_MESSAGE_TYPE_DHT_CLIENT_GET_STOP,
sizeof (struct GNUNET_DHT_ClientGetStopMessage)}, sizeof (struct GNUNET_DHT_ClientGetStopMessage)},
{&handle_dht_local_monitor, NULL, {&handle_dht_local_monitor, NULL,
GNUNET_MESSAGE_TYPE_DHT_MONITOR_GET, GNUNET_MESSAGE_TYPE_DHT_MONITOR_START,
sizeof (struct GNUNET_DHT_MonitorMessage)}, sizeof (struct GNUNET_DHT_MonitorStartStopMessage)},
{&handle_dht_local_monitor_stop, NULL,
GNUNET_MESSAGE_TYPE_DHT_MONITOR_STOP,
sizeof (struct GNUNET_DHT_MonitorStartStopMessage)},
{NULL, NULL, 0, 0} {NULL, NULL, 0, 0}
}; };
forward_map = GNUNET_CONTAINER_multihashmap_create (1024); forward_map = GNUNET_CONTAINER_multihashmap_create (1024);
retry_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MI N); retry_heap = GNUNET_CONTAINER_heap_create (GNUNET_CONTAINER_HEAP_ORDER_MI N);
GNUNET_SERVER_add_handlers (server, plugin_handlers); GNUNET_SERVER_add_handlers (server, plugin_handlers);
GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL) ; GNUNET_SERVER_disconnect_notify (server, &handle_client_disconnect, NULL) ;
} }
/** /**
* Shutdown client subsystem. * Shutdown client subsystem.
skipping to change at line 1130 skipping to change at line 1345
void void
GDS_CLIENTS_done () GDS_CLIENTS_done ()
{ {
GNUNET_assert (client_head == NULL); GNUNET_assert (client_head == NULL);
GNUNET_assert (client_tail == NULL); GNUNET_assert (client_tail == NULL);
if (GNUNET_SCHEDULER_NO_TASK != retry_task) if (GNUNET_SCHEDULER_NO_TASK != retry_task)
{ {
GNUNET_SCHEDULER_cancel (retry_task); GNUNET_SCHEDULER_cancel (retry_task);
retry_task = GNUNET_SCHEDULER_NO_TASK; retry_task = GNUNET_SCHEDULER_NO_TASK;
} }
GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap)); if (NULL != retry_heap)
GNUNET_CONTAINER_heap_destroy (retry_heap); {
retry_heap = NULL; GNUNET_assert (0 == GNUNET_CONTAINER_heap_get_size (retry_heap));
GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (forward_map)); GNUNET_CONTAINER_heap_destroy (retry_heap);
GNUNET_CONTAINER_multihashmap_destroy (forward_map); retry_heap = NULL;
forward_map = NULL; }
if (NULL != forward_map)
{
GNUNET_assert (0 == GNUNET_CONTAINER_multihashmap_size (forward_map));
GNUNET_CONTAINER_multihashmap_destroy (forward_map);
forward_map = NULL;
}
} }
/* end of gnunet-service-dht_clients.c */ /* end of gnunet-service-dht_clients.c */
 End of changes. 61 change blocks. 
108 lines changed or deleted 333 lines changed or added

This html diff was produced by rfcdiff 1.41. The latest version is available from http://tools.ietf.org/tools/rfcdiff/