summaryrefslogtreecommitdiff
path: root/ipc.c
diff options
context:
space:
mode:
Diffstat (limited to 'ipc.c')
-rw-r--r--ipc.c78
1 files changed, 74 insertions, 4 deletions
diff --git a/ipc.c b/ipc.c
index 607c50e..0b7a9d7 100644
--- a/ipc.c
+++ b/ipc.c
@@ -1,5 +1,7 @@
+#define _DEFAULT_SOURCE
#include <stdint.h>
#include <unistd.h>
+#include <string.h>
#include "ipc.h"
#include "main.h"
@@ -12,7 +14,7 @@ int send(void * self, local_id dst, const Message * msg) {
if( me->dist_info == NULL )
return 1;
- if( dst < 0 || dst > MAX_PROCCNT )
+ if( dst < 0 || dst > me->dist_info->x )
return 1;
uint8_t base = me->id * (me->dist_info->proccnt - 1); // base of block of pipes for this process
@@ -46,9 +48,9 @@ int send_multicast(void * self, const Message * msg) {
uint8_t i;
for( i = 0; i < me->dist_info->proccnt - 1; i++) {
- wr_dscr = me->dist_info->pipes[base+i][1];
+ wr_dscr = me->dist_info->pipes[ base + i][1];
wr_size = sizeof(msg->s_header) + msg->s_header.s_payload_len;
- if( write( wr_dscr, &msg, wr_size ) != wr_size )
+ if( write( wr_dscr, msg, wr_size ) != wr_size )
return 1;
}
@@ -56,9 +58,77 @@ int send_multicast(void * self, const Message * msg) {
}
int receive(void * self, local_id from, Message * msg) {
+ if( self == NULL )
+ return 1;
+
+ my_info_t *me = (my_info_t *) self;
+ if( me->dist_info == NULL )
+ return 1;
+
+ if( from < 0 || from > me->dist_info->x )
+ return 1;
+
+ uint8_t base = from * (me->dist_info->proccnt - 1);
+ uint8_t idx = ( me->id > from ? base + me->id - 1 : base + me->id );
+ int descriptor = me->dist_info->pipes[idx][0];
+
+ char buf[MAX_MESSAGE_LEN];
+ int size;
+
+ while( 1 ) {
+ if( ( size = read( descriptor, buf, sizeof( buf ) ) ) > 0 ) {
+ memcpy( msg, buf, sizeof( *msg ) );
+ return 0;
+ }
+ usleep(100000); // sleep 100 ms
+ }
return -1;
}
int receive_any(void * self, Message * msg) {
- return -1;
+ if( self == NULL )
+ return 1;
+
+ my_info_t *me = (my_info_t *) self;
+ if( me->dist_info == NULL )
+ return 1;
+
+ uint8_t i, j = 0;
+ int descriptors[MAX_PROCCNT - 1];
+ for(i = 0; i < me->dist_info->proccnt; i++) {
+ // don't see at pipes as ( ID -> Y )
+ if( me->id == i )
+ continue;
+
+ // base of block of pipes for this process
+ uint8_t base = i * (me->dist_info->proccnt - 1);
+
+ // index number in block of pipes for ( X -> ID )
+ uint8_t idx = ( me->id > i ? base + me->id - 1 : base + me->id );
+
+ descriptors[j++] = me->dist_info->pipes[idx][0];
+ }
+
+ char buf[MAX_MESSAGE_LEN];
+ int size;
+
+ while( 1 ) {
+ for( i = 0; i < j; i++ ) {
+ if( ( size = read( descriptors[i], buf, sizeof( buf ) ) ) > 0 ) {
+ memcpy( msg, buf, sizeof( *msg ) );
+ /*switch( msg->s_header.s_type ) {
+ case STARTED:
+ fprintf( stderr, "[ID: %d] Message STARTED received\n", me->id );
+ break;
+
+ case DONE:
+ fprintf( stderr, "[ID: %d] Message DONE received\n", me->id );
+ break;
+ }*/
+ return 0;
+ }
+ }
+ usleep(100000); // sleep 100 ms
+ }
+ return 1;
}