diff options
author | Sergey Nazaryev <sergey@nazaryev.ru> | 2016-04-03 14:57:31 +0300 |
---|---|---|
committer | Sergey Nazaryev <sergey@nazaryev.ru> | 2016-04-03 14:57:31 +0300 |
commit | a02cc89f160ad4611232992e493c8aaf5212a346 (patch) | |
tree | a6f15a51de73d956e6d4ff49b4197824ff0fd602 | |
download | pa2-a02cc89f160ad4611232992e493c8aaf5212a346.zip pa2-a02cc89f160ad4611232992e493c8aaf5212a346.tar.gz pa2-a02cc89f160ad4611232992e493c8aaf5212a346.tar.bz2 |
Initial commit
-rw-r--r-- | Makefile | 12 | ||||
-rw-r--r-- | child.c | 88 | ||||
-rw-r--r-- | child.h | 10 | ||||
-rw-r--r-- | common.h | 17 | ||||
-rw-r--r-- | ipc.c | 64 | ||||
-rw-r--r-- | ipc.h | 115 | ||||
-rw-r--r-- | main.c | 86 | ||||
-rw-r--r-- | main.h | 29 | ||||
-rw-r--r-- | pa1.h | 28 | ||||
-rw-r--r-- | parent.c | 43 | ||||
-rw-r--r-- | parent.h | 8 | ||||
-rw-r--r-- | test.c | 15 |
12 files changed, 515 insertions, 0 deletions
diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..606e9fb --- /dev/null +++ b/Makefile @@ -0,0 +1,12 @@ +all: dist + +dist: main.o parent.o child.o ipc.o + gcc -o $@ $^ + +main.o: main.c +parent.o: parent.c +child.o: child.c +ipc.o: ipc.c + +clean: + rm -f *.o dist @@ -0,0 +1,88 @@ +#include <stdint.h> + +#include "main.h" +#include "ipc.h" + +void child_phase0( dist_info_t *info, uint8_t id ) { + close_redundant_pipes( info, id ); +} + +void child_phase1( dist_info_t *info, uint8_t id ) { + uint8_t receive_cnt; + Message msg; + + my_info_t me = { + .id = id, + .dist_info = info + }; + + Message start_msg = { + .s_header = { + .s_magic = MESSAGE_MAGIC, + .s_payload_len = 0, + .s_type = STARTED, + .s_local_time = 0 + } + }; + + send_multicast(&me, &start_msg); + + receive_cnt = info->x; + while( receive_cnt ) { + if( !receive_any( &me, &msg ) ) { + if( msg.s_header.s_type != STARTED ) + _exit(1); + + receive_cnt--; + } + } +} + +void child_phase2( dist_info_t *info, uint8_t id ) { +} + +void child_phase3( dist_info_t *info, uint8_t id ) { + uint8_t receive_cnt; + Message msg; + + my_info_t me = { + .id = id, + .dist_info = info + }; + + Message done_msg = { + .s_header = { + .s_magic = MESSAGE_MAGIC, + .s_payload_len = 0, + .s_type = DONE, + .s_local_time = 0 + } + }; + + send_multicast(&me, &done_msg); + receive_cnt = info->x; + while( receive_cnt ) { + if( !receive_any( &me, &msg ) ) { + if( msg.s_header.s_type != DONE ) + _exit(1); + + receive_cnt--; + } + } +} + +void child_workflow( dist_info_t *info, uint8_t id ) { + /* CHILD PHASE 0: Close all redundant descriptors */ + child_phase0(info, id); + + /* CHILD PHASE 1: Send STARTED message for all, receive STARTED + * messages from other childs */ + child_phase1(info, id); + + /* CHILD PHASE 2: Do some useful work */ + child_phase2(info, id); + + /* CHILD PHASE 3: Send DONE message for all, receive DONE messages from + * other childs and exit */ + child_phase3(info, id); +} @@ -0,0 +1,10 @@ +#ifndef __IFMO_DISTRIBUTED_CLASS_CHILD__H +#define __IFMO_DISTRIBUTED_CLASS_CHILD__H + +#include <stdint.h> + +#include "main.h" + +void child_workflow( dist_info_t *info, uint8_t id ); + +#endif // __IFMO_DISTRIBUTED_CLASS_CHILD__H diff --git a/common.h b/common.h new file mode 100644 index 0000000..9be7008 --- /dev/null +++ b/common.h @@ -0,0 +1,17 @@ +/** + * @file common.h + * @Author Michael Kosyakov and Evgeniy Ivanov (ifmo.distributedclass@gmail.com) + * @date March, 2014 + * @brief Common definitions and constants for for programming assignments + * + * Students must not modify this file! + */ + +#ifndef __IFMO_DISTRIBUTED_CLASS_COMMON__H +#define __IFMO_DISTRIBUTED_CLASS_COMMON__H + +// Not extern for simplicity only +static const char * const events_log = "events.log"; +static const char * const pipes_log = "pipes.log"; + +#endif // __IFMO_DISTRIBUTED_CLASS_COMMON__H @@ -0,0 +1,64 @@ +#include <stdint.h> +#include <unistd.h> + +#include "ipc.h" +#include "main.h" + +int send(void * self, local_id dst, const Message * msg) { + if( self == NULL ) + return 1; + + my_info_t *me = (my_info_t *) self; + if( me->dist_info == NULL ) + return 1; + + if( dst < 0 || dst > MAX_PROCCNT ) + return 1; + + uint8_t base = me->id * (me->dist_info->proccnt - 1); // base of block of pipes for this process + uint8_t idx = 0; + + if( me->id < dst ) + idx = base + dst - 1; + else + idx = base + dst; + + int wr_dscr = me->dist_info->pipes[idx][1]; + size_t wr_size = sizeof(msg->s_header) + msg->s_header.s_payload_len; + if( write( wr_dscr, &msg, wr_size ) != wr_size ) + return 1; + + return 0; +} + +int send_multicast(void * self, const Message * msg) { + if( self == NULL ) + return 1; + + my_info_t *me = (my_info_t *) self; + if( me->dist_info == NULL ) + return 1; + + size_t wr_size; + int wr_dscr; + + uint8_t base = me->id * (me->dist_info->proccnt - 1); // base of block of pipes for this process + uint8_t i; + + for( i = 0; i < me->dist_info->proccnt - 1; i++) { + wr_dscr = me->dist_info->pipes[base+i][1]; + wr_size = sizeof(msg->s_header) + msg->s_header.s_payload_len; + if( write( wr_dscr, &msg, wr_size ) != wr_size ) + return 1; + } + + return 0; +} + +int receive(void * self, local_id from, Message * msg) { + return -1; +} + +int receive_any(void * self, Message * msg) { + return -1; +} @@ -0,0 +1,115 @@ +/** + * @file ipc.h + * @Author Michael Kosyakov and Evgeniy Ivanov (ifmo.distributedclass@gmail.com) + * @date March, 2014 + * @brief A simple IPC library for programming assignments + * + * Students must not modify this file! + */ + +#ifndef __IFMO_DISTRIBUTED_CLASS_IPC__H +#define __IFMO_DISTRIBUTED_CLASS_IPC__H + +#include <stddef.h> +#include <stdint.h> + +//------------------------------------------------------------------------------ + +typedef int8_t local_id; +typedef int16_t timestamp_t; + +enum { + MESSAGE_MAGIC = 0xAFAF, + MAX_MESSAGE_LEN = 4096, + PARENT_ID = 0, + MAX_PROCESS_ID = 15 +}; + +typedef enum { + STARTED = 0, ///< message with string (doesn't include trailing '\0') + DONE, ///< message with string (doesn't include trailing '\0') + ACK, ///< empty message + STOP, ///< empty message + TRANSFER, ///< message with TransferOrder + BALANCE_HISTORY, ///< message with BalanceHistory + CS_REQUEST, ///< empty message + CS_REPLY, ///< empty message + CS_RELEASE ///< empty message +} MessageType; + +typedef struct { + uint16_t s_magic; ///< magic signature, must be MESSAGE_MAGIC + uint16_t s_payload_len; ///< length of payload + int16_t s_type; ///< type of the message + timestamp_t s_local_time; ///< set by sender, depends on particular PA: + ///< physical time in PA2 or Lamport's scalar + ///< time in PA3 +} __attribute__((packed)) MessageHeader; + +enum { + MAX_PAYLOAD_LEN = MAX_MESSAGE_LEN - sizeof(MessageHeader) +}; + +typedef struct { + MessageHeader s_header; + char s_payload[MAX_PAYLOAD_LEN]; ///< Must be used as a buffer, unused "tail" + ///< shouldn't be transfered +} __attribute__((packed)) Message; + +//------------------------------------------------------------------------------ + +/** Send a message to the process specified by id. + * + * @param self Any data structure implemented by students to perform I/O + * @param dst ID of recepient + * @param msg Message to send + * + * @return 0 on success, any non-zero value on error + */ +int send(void * self, local_id dst, const Message * msg); + +//------------------------------------------------------------------------------ + +/** Send multicast message. + * + * Send msg to all other processes including parrent. + * Should stop on the first error. + * + * @param self Any data structure implemented by students to perform I/O + * @param msg Message to multicast. + * + * @return 0 on success, any non-zero value on error + */ +int send_multicast(void * self, const Message * msg); + +//------------------------------------------------------------------------------ + +/** Receive a message from the process specified by id. + * + * Might block depending on IPC settings. + * + * @param self Any data structure implemented by students to perform I/O + * @param from ID of the process to receive message from + * @param msg Message structure allocated by the caller + * + * @return 0 on success, any non-zero value on error + */ +int receive(void * self, local_id from, Message * msg); + +//------------------------------------------------------------------------------ + +/** Receive a message from any process. + * + * Receive a message from any process, in case of blocking I/O should be used + * with extra care to avoid deadlocks. + * + * @param self Any data structure implemented by students to perform I/O + * @param msg Message structure allocated by the caller + * + * @return 0 on success, any non-zero value on error + */ +int receive_any(void * self, Message * msg); + +//------------------------------------------------------------------------------ + +#endif // __IFMO_DISTRIBUTED_CLASS_IPC__H @@ -0,0 +1,86 @@ +#include <stdio.h> + +#include "main.h" +#include "child.h" +#include "parent.h" +#include "common.h" + +static char* app_name; + +void usage( FILE* output ) { + fprintf(output, "usage: %s -p <x>\n", app_name); + fprintf(output, "`x` must be int between 1 and %d\n", MAX_X); +} + +void close_redundant_pipes( dist_info_t *info, uint8_t id ) { + uint8_t i, j; + for(i = 0; i < info->proccnt; i++) { + // don't close all pipes ( ID -> Y ) + if( id == i ) + continue; + + uint8_t base = i * (info->proccnt-1); // base of block of pipes for this process + uint8_t idx; // index number in block of pipes for ( X -> ID ) + + if( id < i ) + idx = id - 1; + else + idx = id; + + for(j = 0; j < info->proccnt-1; j++) { + if( j == idx ) { + close( info->pipes[base+j][1] ); // close only write descriptor of ( X -> ID ) + } else { + close( info->pipes[base+j][0] ); // close read descriptor of ( X -> Y, where Y != ID ) + close( info->pipes[base+j][1] ); // close write descriptor of ( X -> Y, where Y != ID ) + } + } + } +} + +int main(int argc, char* argv[]) { + uint8_t i, j; + dist_info_t info; + app_name = argv[0]; + + if( argc != 2 || strcmp( argv[1], "-p") != 0 ) { + usage(stdout); + return 1; + } + + // fill info about distributed network + info.x = atoi(argv[2]); + if( info.x < 0 || info.x > MAX_X ) { + usage(stdout); + return 1; + } + + info.proccnt = info.x + 1; + info.parent_pid = getpid(); + info.events_log = fopen( events_log, "w" ); + info.pipes_log = fopen( pipes_log, "w" ); + + // open all (N * (N-1)) pipes for a full mesh topology + for( i = 0; i < info.proccnt * (info.proccnt - 1); i++ ) + pipe(info.pipes[i]); + +// for( i = 1; i < info.proccnt; i++ ) { +// pid_t pid = fork(); +// switch(pid) { +// case 0: +// /* Child branch */ +// child_workflow( &info, i ); +// _exit(0); +// break; +// case -1: +// /* Error */ +// perror("fork"); +// return 1; +// default: +// break; +// } +// } +// +// parent_workflow( &info ); + return 0; +} @@ -0,0 +1,29 @@ +#ifndef __IFMO_DISTRIBUTED_CLASS_MAIN__H +#define __IFMO_DISTRIBUTED_CLASS_MAIN__H + +#include <stdio.h> +#include <stdint.h> +#include <unistd.h> + +#define MAX_X ( 10 ) +#define MAX_PROCCNT ( MAX_X + 1 ) +#define MAX_PIPES ( MAX_PROCCNT ) * ( MAX_PROCCNT - 1 ) + +typedef struct { + int pipes[MAX_PIPES][2]; + uint8_t proccnt; + uint8_t x; + + FILE* events_log; + FILE* pipes_log; + pid_t parent_pid; +} dist_info_t; + +typedef struct { + int id; + dist_info_t *dist_info; +} my_info_t; + +void close_redundant_pipes( dist_info_t *info, uint8_t id ); + +#endif // __IFMO_DISTRIBUTED_CLASS_MAIN__H @@ -0,0 +1,28 @@ +/** + * @file pa1.h + * @Author Michael Kosyakov and Evgeniy Ivanov (ifmo.distributedclass@gmail.com) + * @date March, 2014 + * @brief Constants for programming assignment 1 + * + * Students must not modify this file! + */ + +#ifndef __IFMO_DISTRIBUTED_CLASS_PA1__H +#define __IFMO_DISTRIBUTED_CLASS_PA1__H + +/* %1d - local id, %5d - PID, e.g. + * Process 1 (pid 12341, parent 12340) has STARTED\n + */ +static const char * const log_started_fmt = + "Process %1d (pid %5d, parent %5d) has STARTED\n"; + +static const char * const log_received_all_started_fmt = + "Process %1d received all STARTED messages\n"; + +static const char * const log_done_fmt = + "Process %1d has DONE its work\n"; + +static const char * const log_received_all_done_fmt = + "Process %1d received all DONE messages\n"; + +#endif // __IFMO_DISTRIBUTED_CLASS_PA1__H diff --git a/parent.c b/parent.c new file mode 100644 index 0000000..54de9b0 --- /dev/null +++ b/parent.c @@ -0,0 +1,43 @@ +#include <stdint.h> + +#include "main.h" +#include "ipc.h" + +void parent_workflow( dist_info_t *info ) { + Message msg; + uint8_t receive_cnt; + + my_info_t me = { + .id = PARENT_ID, + .dist_info = info + }; + + // PARENT PHASE 0: Close all redundant descriptors + close_redundant_pipes( info, PARENT_ID ); + + // PARENT PHASE 1: Receive all STARTED messages + receive_cnt = info->x; + while( receive_cnt ) { + Message msg; + if( receive_any( &me, &msg ) ) { + if( msg.s_header.s_type != STARTED ) + return; + + receive_cnt--; + } + } + + // PARENT PHASE 2: Receive all DONE messages + receive_cnt = info->x; + while( receive_cnt ) { + Message msg; + if( receive_any( &me, &msg ) ) { + if( msg.s_header.s_type != DONE ) + return; + + receive_cnt--; + } + } + + // PARENT PHASE 3: Wait for the end of all processes +} diff --git a/parent.h b/parent.h new file mode 100644 index 0000000..eec9c49 --- /dev/null +++ b/parent.h @@ -0,0 +1,8 @@ +#ifndef __IFMO_DISTRIBUTED_CLASS_PARENT__H +#define __IFMO_DISTRIBUTED_CLASS_PARENT__H + +#include "main.h" + +void parent_workflow( dist_info_t *info ); + +#endif // __IFMO_DISTRIBUTED_CLASS_PARENT__H @@ -0,0 +1,15 @@ +#include <stdio.h> +#include <unistd.h> + +int main() { + int test[2]; + pipe(&test); + + char buf[] = "123"; + char buf2[4]; + write( test[1], buf, sizeof(buf) ); + printf("lol\n"); + read( test[0], buf2, sizeof(buf2) ); + printf("answer: %s\n", buf2); +} + |