summaryrefslogtreecommitdiff
path: root/child.c
diff options
context:
space:
mode:
Diffstat (limited to 'child.c')
-rw-r--r--child.c132
1 files changed, 119 insertions, 13 deletions
diff --git a/child.c b/child.c
index e3885ed..56c32d1 100644
--- a/child.c
+++ b/child.c
@@ -1,9 +1,11 @@
#include <stdint.h>
#include <string.h>
-#include "dist.h"
+#include "banking.h"
#include "ipc.h"
-#include "pa1.h"
+#include "pa2345.h"
+
+#include "dist.h"
void child_phase0( dist_info_t *info, uint8_t id ) {
close_redundant_pipes( info, id );
@@ -11,7 +13,7 @@ void child_phase0( dist_info_t *info, uint8_t id ) {
void child_phase1( dist_info_t *info, uint8_t id ) {
pid_t child_pid = getpid();
- char buf[512];
+ char buf[MAX_MESSAGE_LEN];
uint8_t i;
my_info_t me = {
@@ -19,14 +21,17 @@ void child_phase1( dist_info_t *info, uint8_t id ) {
.dist_info = info
};
- int size = snprintf( buf, sizeof(buf), log_started_fmt, id, child_pid, info->parent_pid );
+ int size = snprintf( buf, sizeof(buf), log_started_fmt, get_physical_time(),
+ id, child_pid,
+ info->parent_pid,
+ info->balances[ id ] );
Message msg = {
.s_header = {
.s_magic = MESSAGE_MAGIC,
.s_payload_len = size + 1,
.s_type = STARTED,
- .s_local_time = 0
+ .s_local_time = get_physical_time()
}
};
@@ -48,15 +53,88 @@ void child_phase1( dist_info_t *info, uint8_t id ) {
}
}
- snprintf( buf, sizeof(buf), log_received_all_started_fmt, id );
+ snprintf( buf, sizeof(buf), log_received_all_started_fmt, get_physical_time(), id );
fputs( buf, info->events_log );
fputs( buf, stdout );
}
-void child_phase2( dist_info_t *info, uint8_t id ) {
+void child_phase2( dist_info_t *info, uint8_t id, BalanceHistory *history ) {
+ Message msg;
+ BalanceState balanceState;
+ char buf[MAX_MESSAGE_LEN];
+
+ my_info_t me = {
+ .id = id,
+ .dist_info = info
+ };
+
+ uint8_t i;
+ uint8_t last_order_time = -1;
+
+ int running = 1;
+ while( running ) {
+ if( !receive_any( &me, &msg ) ) {
+ switch( msg.s_header.s_type ) {
+ case TRANSFER: {
+ if( msg.s_payload == NULL )
+ return;
+
+ TransferOrder *order = (TransferOrder *) msg.s_payload;
+ uint8_t cur_order_time = msg.s_header.s_local_time;
+
+ for( i = last_order_time + 1; i < cur_order_time; i++ ) {
+ balanceState.s_balance = info->balances[ id ];
+ balanceState.s_time = i;
+ balanceState.s_balance_pending_in = 0;
+ history->s_history[ i ] = balanceState;
+ }
+
+ if( order->s_src == id ) {
+ snprintf( buf, sizeof(buf), log_transfer_out_fmt,
+ get_physical_time(), order->s_src,
+ order->s_amount, order->s_dst );
+ fputs( buf, info->events_log );
+ fputs( buf, stdout );
+
+ info->balances[ id ] -= order->s_amount;
+ send( &me, order->s_dst, &msg );
+ } else if( order->s_dst == id ) {
+ snprintf( buf, sizeof(buf), log_transfer_in_fmt,
+ get_physical_time(), order->s_dst,
+ order->s_amount, order->s_src);
+ fputs( buf, info->events_log );
+ fputs( buf, stdout );
+
+ Message stopMsg = {
+ .s_header = {
+ .s_magic = MESSAGE_MAGIC,
+ .s_payload_len = 0,
+ .s_type = ACK,
+ .s_local_time = get_physical_time()
+ }
+ };
+ info->balances[ id ] += order->s_amount;
+ send( &me, PARENT_ID, &stopMsg );
+ }
+
+ balanceState.s_balance = info->balances[ id ];
+ balanceState.s_time = cur_order_time;
+ balanceState.s_balance_pending_in = 0;
+
+ history->s_history[ cur_order_time ] = balanceState;
+ history->s_history_len = cur_order_time + 1;
+ last_order_time = cur_order_time;
+ break;
+ }
+ case STOP:
+ running = 0;
+ break;
+ }
+ }
+ }
}
-void child_phase3( dist_info_t *info, uint8_t id ) {
+void child_phase3( dist_info_t *info, uint8_t id, BalanceHistory *history ) {
char buf[MAX_MESSAGE_LEN];
uint8_t i;
@@ -65,14 +143,16 @@ void child_phase3( dist_info_t *info, uint8_t id ) {
.dist_info = info
};
- int size = snprintf( buf, sizeof(buf), log_done_fmt, id );
+ int size = snprintf( buf, sizeof(buf), log_done_fmt,
+ get_physical_time(), id,
+ info->balances[ id ] );
Message msg = {
.s_header = {
.s_magic = MESSAGE_MAGIC,
.s_payload_len = size + 1,
.s_type = DONE,
- .s_local_time = 0
+ .s_local_time = get_physical_time()
}
};
@@ -94,12 +174,38 @@ void child_phase3( dist_info_t *info, uint8_t id ) {
}
}
- snprintf( buf, sizeof(buf), log_received_all_done_fmt, id );
+ snprintf( buf, sizeof(buf), log_received_all_done_fmt,
+ get_physical_time(), id );
fputs( buf, info->events_log );
fputs( buf, stdout );
+
+ int time = get_physical_time();
+ for( i = history->s_history_len; i < time; i++ ) {
+ history->s_history[i].s_balance = history->s_history[ history->s_history_len - 1].s_balance;
+ history->s_history[i].s_time = i;
+ }
+
+ history->s_history_len = time;
+
+ int wr_size = history->s_history_len * sizeof(history->s_history[0]);
+ Message historyMsg = {
+ .s_header = {
+ .s_magic = MESSAGE_MAGIC,
+ .s_payload_len = wr_size,
+ .s_type = BALANCE_HISTORY,
+ .s_local_time = time
+ }
+ };
+
+ memcpy( historyMsg.s_payload, history, wr_size );
+ send( &me, PARENT_ID, &historyMsg );
}
void child_workflow( dist_info_t *info, uint8_t id ) {
+ BalanceHistory history = {
+ .s_id = id
+ };
+
/* CHILD PHASE 0: Close all redundant descriptors */
child_phase0(info, id);
@@ -108,9 +214,9 @@ void child_workflow( dist_info_t *info, uint8_t id ) {
child_phase1(info, id);
/* CHILD PHASE 2: Do some useful work */
- child_phase2(info, id);
+ child_phase2(info, id, &history);
/* CHILD PHASE 3: Send DONE message for all, receive DONE messages from
* other childs and exit */
- child_phase3(info, id);
+ child_phase3(info, id, &history);
}