diff options
Diffstat (limited to 'ipc.c')
-rw-r--r-- | ipc.c | 78 |
1 files changed, 74 insertions, 4 deletions
@@ -1,5 +1,7 @@ +#define _DEFAULT_SOURCE #include <stdint.h> #include <unistd.h> +#include <string.h> #include "ipc.h" #include "main.h" @@ -12,7 +14,7 @@ int send(void * self, local_id dst, const Message * msg) { if( me->dist_info == NULL ) return 1; - if( dst < 0 || dst > MAX_PROCCNT ) + if( dst < 0 || dst > me->dist_info->x ) return 1; uint8_t base = me->id * (me->dist_info->proccnt - 1); // base of block of pipes for this process @@ -46,9 +48,9 @@ int send_multicast(void * self, const Message * msg) { uint8_t i; for( i = 0; i < me->dist_info->proccnt - 1; i++) { - wr_dscr = me->dist_info->pipes[base+i][1]; + wr_dscr = me->dist_info->pipes[ base + i][1]; wr_size = sizeof(msg->s_header) + msg->s_header.s_payload_len; - if( write( wr_dscr, &msg, wr_size ) != wr_size ) + if( write( wr_dscr, msg, wr_size ) != wr_size ) return 1; } @@ -56,9 +58,77 @@ int send_multicast(void * self, const Message * msg) { } int receive(void * self, local_id from, Message * msg) { + if( self == NULL ) + return 1; + + my_info_t *me = (my_info_t *) self; + if( me->dist_info == NULL ) + return 1; + + if( from < 0 || from > me->dist_info->x ) + return 1; + + uint8_t base = from * (me->dist_info->proccnt - 1); + uint8_t idx = ( me->id > from ? base + me->id - 1 : base + me->id ); + int descriptor = me->dist_info->pipes[idx][0]; + + char buf[MAX_MESSAGE_LEN]; + int size; + + while( 1 ) { + if( ( size = read( descriptor, buf, sizeof( buf ) ) ) > 0 ) { + memcpy( msg, buf, sizeof( *msg ) ); + return 0; + } + usleep(100000); // sleep 100 ms + } return -1; } int receive_any(void * self, Message * msg) { - return -1; + if( self == NULL ) + return 1; + + my_info_t *me = (my_info_t *) self; + if( me->dist_info == NULL ) + return 1; + + uint8_t i, j = 0; + int descriptors[MAX_PROCCNT - 1]; + for(i = 0; i < me->dist_info->proccnt; i++) { + // don't see at pipes as ( ID -> Y ) + if( me->id == i ) + continue; + + // base of block of pipes for this process + uint8_t base = i * (me->dist_info->proccnt - 1); + + // index number in block of pipes for ( X -> ID ) + uint8_t idx = ( me->id > i ? base + me->id - 1 : base + me->id ); + + descriptors[j++] = me->dist_info->pipes[idx][0]; + } + + char buf[MAX_MESSAGE_LEN]; + int size; + + while( 1 ) { + for( i = 0; i < j; i++ ) { + if( ( size = read( descriptors[i], buf, sizeof( buf ) ) ) > 0 ) { + memcpy( msg, buf, sizeof( *msg ) ); + /*switch( msg->s_header.s_type ) { + case STARTED: + fprintf( stderr, "[ID: %d] Message STARTED received\n", me->id ); + break; + + case DONE: + fprintf( stderr, "[ID: %d] Message DONE received\n", me->id ); + break; + }*/ + return 0; + } + } + usleep(100000); // sleep 100 ms + } + return 1; } |