changeset 1958:9c01945b5d20

Use zlib to compress event log streams
author Mike Pavone <pavone@retrodev.com>
date Sat, 02 May 2020 17:33:23 -0700
parents ba06346611a1
children 6d99bdbf1e3e
files blastem.c event_log.c event_log.h gen_player.c
diffstat 4 files changed, 271 insertions(+), 73 deletions(-) [+]
line wrap: on
line diff
--- a/blastem.c	Sat May 02 00:52:21 2020 -0700
+++ b/blastem.c	Sat May 02 17:33:23 2020 -0700
@@ -598,7 +598,6 @@
 		} else if (!loaded) {
 			reader_port = parse_addr_port(argv[i]);
 			if (reader_port) {
-				//init_event_reader_tcp(&reader, argv[i], port);
 				reader_addr = argv[i];
 			} else {
 				if (!(cart.size = load_rom(argv[i], &cart.buffer, stype == SYSTEM_UNKNOWN ? &stype : NULL))) {
@@ -722,6 +721,8 @@
 			fatal_error("Failed to detect system type for %s\n", romfname);
 		}
 		game_system = current_system = alloc_config_player(stype, &reader);
+		//free inflate stream as it was inflateCopied to an internal event reader in the player
+		inflateEnd(&reader.input_stream);
 		setup_saves(&cart, current_system);
 		update_title(current_system->info.name);
 	}
--- a/event_log.c	Sat May 02 00:52:21 2020 -0700
+++ b/event_log.c	Sat May 02 17:33:23 2020 -0700
@@ -15,6 +15,7 @@
 #include "util.h"
 #include "blastem.h"
 #include "saves.h"
+#include "zlib/zlib.h"
 
 enum {
 	CMD_GAMEPAD_DOWN,
@@ -24,9 +25,47 @@
 static uint8_t active, fully_active;
 static FILE *event_file;
 static serialize_buffer buffer;
+static uint8_t *compressed;
+static size_t compressed_storage;
+static z_stream output_stream;
+static uint32_t last;
+
+static void event_log_common_init(void)
+{
+	init_serialize(&buffer);
+	compressed_storage = 128*1024;
+	compressed = malloc(compressed_storage);
+	deflateInit(&output_stream, 9);
+	output_stream.avail_out = compressed_storage;
+	output_stream.next_out = compressed;
+	output_stream.avail_in = 0;
+	output_stream.next_in = buffer.data;
+	last = 0;
+	active = 1;
+}
+
+static uint8_t multi_count;
+static size_t multi_start;
+static void finish_multi(void)
+{
+	buffer.data[multi_start] |= multi_count - 2;
+	multi_count = 0;
+}
+
+static void file_finish(void)
+{
+	fwrite(compressed, 1, output_stream.next_out - compressed, event_file);
+	output_stream.next_out = compressed;
+	output_stream.avail_out = compressed_storage;
+	int result = deflate(&output_stream, Z_FINISH);
+	if (Z_STREAM_END != result) {
+		fatal_error("Final deflate call returned %d\n", result);
+	}
+	fwrite(compressed, 1, output_stream.next_out - compressed, event_file);
+	fclose(event_file);
+}
 
 static const char el_ident[] = "BLSTEL\x02\x00";
-static uint32_t last;
 void event_log_file(char *fname)
 {
 	event_file = fopen(fname, "wb");
@@ -35,9 +74,9 @@
 		return;
 	}
 	fwrite(el_ident, 1, sizeof(el_ident) - 1, event_file);
-	init_serialize(&buffer);
-	active = fully_active = 1;
-	last = 0;
+	event_log_common_init();
+	fully_active = 1;
+	atexit(file_finish);
 }
 
 static int listen_sock, remotes[7];
@@ -70,7 +109,7 @@
 		goto cleanup_address;
 	}
 	socket_blocking(listen_sock, 0);
-	active = 1;
+	event_log_common_init();
 cleanup_address:
 	freeaddrinfo(result);
 }
@@ -90,12 +129,15 @@
 	}
 	save_int8(&buffer, name_len);
 	save_buffer8(&buffer, name, strlen(name));
-	if (!fully_active) {
+	if (listen_sock) {
 		system_start = malloc(buffer.size);
 		system_start_size = buffer.size;
 		memcpy(system_start, buffer.data, buffer.size);
-		buffer.size = 0;
+	} else {
+		//system start header is never compressed, so write to file immediately
+		fwrite(buffer.data, 1, buffer.size, event_file);
 	}
+	buffer.size = 0;
 }
 
 //header formats
@@ -106,21 +148,17 @@
 #define FORMAT_4BYTE 0xF0
 static uint8_t last_event_type = 0xFF;
 static uint32_t last_delta;
-static uint8_t multi_count;
-static size_t multi_start;
 static void event_header(uint8_t type, uint32_t cycle)
 {
 	uint32_t delta = cycle - last;
 	if (multi_count) {
 		if (type != last_event_type || delta != last_delta) {
-			buffer.data[multi_start] |= multi_count - 2;
-			multi_count = 0;
+			finish_multi();
 		} else {
 			++multi_count;
 			if (multi_count == 17) {
-				buffer.data[multi_start] |= multi_count - 2;
+				finish_multi();
 				last_event_type = 0xFF;
-				multi_count = 0;
 			}
 			return;
 		}
@@ -159,7 +197,7 @@
 	save_int32(&buffer, deduction);
 }
 
-static size_t remote_send_progress[7];
+static uint8_t *remote_send_progress[7];
 static uint8_t remote_needs_state[7];
 static void flush_socket(void)
 {
@@ -174,11 +212,11 @@
 			current_system->save_state = EVENTLOG_SLOT + 1;
 		}
 	}
-	size_t min_progress = 0;
+	uint8_t *min_progress = compressed;
 	for (int i = 0; i < num_remotes; i++) {
 		int sent = 1;
 		if (remote_needs_state[i]) {
-			remote_send_progress[i] = buffer.size;
+			remote_send_progress[i] = output_stream.next_out;
 		} else {
 			uint8_t buffer[1500];
 			int bytes = recv(remotes[i], buffer, sizeof(buffer), 0);
@@ -210,9 +248,9 @@
 				}
 			}
 		}
-		while (sent && buffer.size - remote_send_progress[i])
+		while (sent && output_stream.next_out > remote_send_progress[i])
 		{
-			sent = send(remotes[i], buffer.data + remote_send_progress[i], buffer.size - remote_send_progress[i], 0);
+			sent = send(remotes[i], remote_send_progress[i], output_stream.next_out - remote_send_progress[i], 0);
 			if (sent >= 0) {
 				remote_send_progress[i] += sent;
 			} else if (socket_error_is_wouldblock()) {
@@ -229,11 +267,12 @@
 			}
 		}
 	}
-	if (min_progress == buffer.size) {
-		buffer.size = 0;
-		memset(remote_send_progress, 0, sizeof(remote_send_progress));
-		multi_count = 0;
-		last_event_type = 0xFF;
+	if (min_progress == output_stream.next_out) {
+		output_stream.next_out = compressed;
+		output_stream.avail_out = compressed_storage;
+		for (int i = 0; i < num_remotes; i++) {
+			remote_send_progress[i] = compressed;
+		}
 	}
 }
 
@@ -245,13 +284,26 @@
 	event_header(type, cycle);
 	last = cycle;
 	save_buffer8(&buffer, payload, size);
-	if (listen_sock && buffer.size > 1280) {
-		if (multi_count) {
-			buffer.data[multi_start] |= multi_count - 2;
-			multi_count = 0;
-			last_event_type = 0xFF;
+	if (!multi_count) {
+		last_event_type = 0xFF;
+		output_stream.avail_in = buffer.size - (output_stream.next_in - buffer.data);
+		int result = deflate(&output_stream, Z_NO_FLUSH);
+		if (result != Z_OK) {
+			fatal_error("deflate returned %d\n", result);
 		}
-		flush_socket();
+		if (listen_sock) {
+			if ((output_stream.next_out - compressed) > 1280 || !output_stream.avail_out) {
+				flush_socket();
+			}
+		} else if (!output_stream.avail_out) {
+			fwrite(compressed, 1, compressed_storage, event_file);
+			output_stream.next_out = compressed;
+			output_stream.avail_out = compressed_storage;
+		}
+		if (!output_stream.avail_in) {
+			buffer.size = 0;
+			output_stream.next_in = buffer.data;
+		}
 	}
 }
 
@@ -300,6 +352,39 @@
 	return total;
 }
 
+void deflate_flush(uint8_t full)
+{
+	output_stream.avail_in = buffer.size - (output_stream.next_in - buffer.data);
+	while (output_stream.avail_in)
+	{
+		if (!output_stream.avail_out) {
+			size_t old_storage = compressed_storage;
+			uint8_t *old_compressed = compressed;
+			compressed_storage *= 2;
+			compressed = realloc(compressed, compressed_storage);
+			output_stream.next_out = compressed + old_storage;
+			output_stream.avail_out = old_storage;
+			for (int i = 0; i < num_remotes; i++) {
+				if (!remote_needs_state[i]) {
+					remote_send_progress[i] = compressed + (remote_send_progress[i] - old_compressed);
+				}
+			}
+		}
+		int result = deflate(&output_stream, full ? Z_FINISH : Z_SYNC_FLUSH);
+		if (result != (full ? Z_STREAM_END : Z_OK)) {
+			fatal_error("deflate returned %d\n", result);
+		}
+		if (full) {
+			result = deflateReset(&output_stream);
+			if (result != Z_OK) {
+				fatal_error("deflateReset returned %d\n", result);
+			}
+		}
+	}
+	output_stream.next_in = buffer.data;
+	buffer.size = 0;
+}
+
 void event_state(uint32_t cycle, serialize_buffer *state)
 {
 	if (!fully_active) {
@@ -311,20 +396,12 @@
 		last_byte_address >> 8, last_byte_address,
 		state->size >> 16, state->size >> 8, state->size
 	};
+	uint8_t sent_system_start = 0;
 	for (int i = 0; i < num_remotes; i++)
 	{
 		if (remote_needs_state[i]) {
-			if(
-				send_all(remotes[i], system_start, system_start_size, 0) == system_start_size
-				&& send_all(remotes[i], header, sizeof(header), 0) == sizeof(header)
-				&& send_all(remotes[i], state->data, state->size, 0) == state->size
-			) {
-				remote_send_progress[i] = buffer.size;
-				remote_needs_state[i] = 0;
-				socket_blocking(remotes[i], 0);
-				int flag = 1;
-				setsockopt(remotes[i], IPPROTO_TCP, TCP_NODELAY, (const char *)&flag, sizeof(flag));
-				fully_active = 1;
+			if (send_all(remotes[i], system_start, system_start_size, 0) == system_start_size) {
+				sent_system_start = 1;
 			} else {
 				socket_close(remotes[i]);
 				remotes[i] = remotes[num_remotes-1];
@@ -335,6 +412,41 @@
 			}
 		}
 	}
+	if (sent_system_start) {
+		if (fully_active) {
+			if (multi_count) {
+				finish_multi();
+			}
+			//full flush is needed so new and old clients can share a stream
+			deflate_flush(1);
+		}
+		save_buffer8(&buffer, header, sizeof(header));
+		save_buffer8(&buffer, state->data, state->size);
+		size_t old_compressed_size = output_stream.next_out - compressed;
+		deflate_flush(1);
+		size_t state_size = output_stream.next_out - compressed - old_compressed_size;
+		for (int i = 0; i < num_remotes; i++) {
+			if (remote_needs_state[i]) {
+				if (send_all(remotes[i], compressed + old_compressed_size, state_size, 0) == state_size) {
+					remote_send_progress[i] = compressed + old_compressed_size;
+					remote_needs_state[i] = 0;
+					socket_blocking(remotes[i], 0);
+					int flag = 1;
+					setsockopt(remotes[i], IPPROTO_TCP, TCP_NODELAY, (const char *)&flag, sizeof(flag));
+					fully_active = 1;
+				} else {
+					socket_close(remotes[i]);
+					remotes[i] = remotes[num_remotes-1];
+					remote_send_progress[i] = remote_send_progress[num_remotes-1];
+					remote_needs_state[i] = remote_needs_state[num_remotes-1];
+					num_remotes--;
+					i--;
+				}
+			}
+		}
+		output_stream.next_out = compressed + old_compressed_size;
+		output_stream.avail_out = compressed_storage - old_compressed_size;
+	}
 }
 
 void event_flush(uint32_t cycle)
@@ -343,25 +455,55 @@
 		return;
 	}
 	if (fully_active) {
-		event_log(EVENT_FLUSH, cycle, 0, NULL);
+		event_header(EVENT_FLUSH, cycle);
+		last = cycle;
+		
+		deflate_flush(0);
 	}
 	if (event_file) {
-		fwrite(buffer.data, 1, buffer.size, event_file);
+		fwrite(compressed, 1, output_stream.next_out - compressed, event_file);
 		fflush(event_file);
-		buffer.size = 0;
-		multi_count = 0;
-		last_event_type = 0xFF;
+		output_stream.next_out = compressed;
+		output_stream.avail_out = compressed_storage;
 	} else if (listen_sock) {
 		flush_socket();
 	}
 }
 
+static void init_event_reader_common(event_reader *reader)
+{
+	reader->last_cycle = 0;
+	reader->repeat_event = 0xFF;
+	reader->storage = 512 * 1024;
+	init_deserialize(&reader->buffer, malloc(reader->storage), reader->storage);
+	reader->buffer.size = 0;
+	memset(&reader->input_stream, 0, sizeof(reader->input_stream));
+	
+}
+
 void init_event_reader(event_reader *reader, uint8_t *data, size_t size)
 {
 	reader->socket = 0;
 	reader->last_cycle = 0;
 	reader->repeat_event = 0xFF;
-	init_deserialize(&reader->buffer, data, size);
+	init_event_reader_common(reader);
+	uint8_t name_len = data[1];
+	reader->buffer.size = name_len + 2;
+	memcpy(reader->buffer.data, data, reader->buffer.size);
+	reader->input_stream.next_in = data + reader->buffer.size;
+	reader->input_stream.avail_in = size - reader->buffer.size;
+	
+	int result = inflateInit(&reader->input_stream);
+	if (Z_OK != result) {
+		fatal_error("inflateInit returned %d\n", result);
+	}
+	reader->input_stream.next_out = reader->buffer.data + reader->buffer.size;
+	reader->input_stream.avail_out = reader->storage - reader->buffer.size;
+	result = inflate(&reader->input_stream, Z_NO_FLUSH);
+	if (Z_OK != result && Z_STREAM_END != result) {
+		fatal_error("inflate returned %d\n", result);
+	}
+	reader->buffer.size = reader->input_stream.next_out - reader->buffer.data;
 }
 
 void init_event_reader_tcp(event_reader *reader, char *address, char *port)
@@ -382,10 +524,10 @@
 		fatal_error("Failed to connect to %s:%s for event log stream\n", address, port);
 	}
 	
-	reader->storage = 512 * 1024;
-	reader->last_cycle = 0;
-	init_deserialize(&reader->buffer, malloc(reader->storage), reader->storage);
-	reader->buffer.size = 0;
+	init_event_reader_common(reader);
+	reader->socket_buffer_size = 256 * 1024;
+	reader->socket_buffer = malloc(reader->socket_buffer_size);
+	
 	while(reader->buffer.size < 3 || reader->buffer.size < 3 + reader->buffer.data[2])
 	{
 		int bytes = recv(reader->socket, reader->buffer.data + reader->buffer.size, reader->storage - reader->buffer.size, 0);
@@ -394,6 +536,21 @@
 		}
 		reader->buffer.size += bytes;
 	}
+	size_t init_msg_len = 3 + reader->buffer.data[2];
+	memcpy(reader->socket_buffer, reader->buffer.data + init_msg_len, reader->buffer.size - init_msg_len);
+	reader->input_stream.next_in = reader->socket_buffer;
+	reader->input_stream.avail_in = reader->buffer.size - init_msg_len;
+	reader->buffer.size = init_msg_len;
+	int res = inflateInit(&reader->input_stream);
+	if (Z_OK != res) {
+		fatal_error("inflateInit returned %d\n", res);
+	}
+	reader->input_stream.next_out = reader->buffer.data + init_msg_len;
+	reader->input_stream.avail_out = reader->storage - init_msg_len;
+	res = inflate(&reader->input_stream, Z_NO_FLUSH);
+	if (Z_OK != res && Z_BUF_ERROR != res) {
+		fatal_error("inflate returned %d in init_event_reader_tcp\n", res);
+	}
 	socket_blocking(reader->socket, 0);
 	int flag = 1;
 	setsockopt(reader->socket, IPPROTO_TCP, TCP_NODELAY, (const char *)&flag, sizeof(flag));
@@ -401,35 +558,70 @@
 
 static void read_from_socket(event_reader *reader)
 {
-	if (reader->storage - (reader->buffer.size - reader->buffer.cur_pos) < 128 * 1024) {
-		reader->storage *= 2;
-		uint8_t *new_buf = malloc(reader->storage);
-		memcpy(new_buf, reader->buffer.data + reader->buffer.cur_pos, reader->buffer.size - reader->buffer.cur_pos);
-		free(reader->buffer.data);
-		reader->buffer.data = new_buf;
-		reader->buffer.size -= reader->buffer.cur_pos;
-		reader->buffer.cur_pos = 0;
-	} else if (reader->buffer.cur_pos >= reader->buffer.size/2 && reader->buffer.size >= reader->storage/2) {
-		memmove(reader->buffer.data, reader->buffer.data + reader->buffer.cur_pos, reader->buffer.size - reader->buffer.cur_pos);
-		reader->buffer.size -= reader->buffer.cur_pos;
-		reader->buffer.cur_pos = 0;
+	if (reader->socket_buffer_size - reader->input_stream.avail_in < 128 * 1024) {
+		reader->socket_buffer_size *= 2;
+		uint8_t *new_buf = malloc(reader->socket_buffer_size);
+		memcpy(new_buf, reader->input_stream.next_in, reader->input_stream.avail_in);
+		free(reader->socket_buffer);
+		reader->socket_buffer = new_buf;
+		reader->input_stream.next_in = new_buf;
+	} else if (
+		reader->input_stream.next_in - reader->socket_buffer >= reader->input_stream.avail_in 
+		&& reader->input_stream.next_in - reader->socket_buffer + reader->input_stream.avail_in >= reader->socket_buffer_size/2
+	) {
+		memmove(reader->socket_buffer, reader->input_stream.next_in, reader->input_stream.avail_in);
+		reader->input_stream.next_in = reader->socket_buffer;
 	}
-	int bytes = recv(reader->socket, reader->buffer.data + reader->buffer.size, reader->storage - reader->buffer.size, 0);
+	uint8_t *space_start = reader->input_stream.next_in + reader->input_stream.avail_in;
+	size_t space = (reader->socket_buffer + reader->socket_buffer_size) - space_start;
+	int bytes = recv(reader->socket, space_start, space, 0);
 	if (bytes >= 0) {
-		reader->buffer.size += bytes;
+		reader->input_stream.avail_in += bytes;
 	} else if (!socket_error_is_wouldblock()) {
 		fatal_error("Connection closed, error = %X\n", socket_last_error());
 	}
 }
 
+static void inflate_flush(event_reader *reader)
+{
+	if (reader->buffer.cur_pos > reader->storage / 2) {
+		memmove(reader->buffer.data, reader->buffer.data + reader->buffer.cur_pos, reader->buffer.size - reader->buffer.cur_pos);
+		reader->buffer.size -= reader->buffer.cur_pos;
+		reader->buffer.cur_pos = 0;
+		reader->input_stream.next_out = reader->buffer.data + reader->buffer.size;
+		reader->input_stream.avail_out = reader->storage - reader->buffer.size;
+	}
+	int result = inflate(&reader->input_stream, Z_SYNC_FLUSH);
+	if (Z_OK != result && Z_STREAM_END != result) {
+		fatal_error("inflate returned %d\n", result);
+	}
+	reader->buffer.size = reader->input_stream.next_out - reader->buffer.data;
+	if (result == Z_STREAM_END && (reader->socket || reader->input_stream.avail_in)) {
+		inflateReset(&reader->input_stream);
+		if (reader->input_stream.avail_in) {
+			inflate_flush(reader);
+		}
+	}
+	
+}
+
 void reader_ensure_data(event_reader *reader, size_t bytes)
 {
-	if (reader->socket && reader->buffer.size - reader->buffer.cur_pos < bytes) {
-		socket_blocking(reader->socket, 1);
-		while (reader->buffer.size - reader->buffer.cur_pos < bytes) {
+	if (reader->buffer.size - reader->buffer.cur_pos < bytes) {
+		if (reader->socket) {
 			read_from_socket(reader);
 		}
-		socket_blocking(reader->socket, 0);
+		if (reader->input_stream.avail_in) {
+			inflate_flush(reader);
+		}
+		if (reader->socket && reader->buffer.size - reader->buffer.cur_pos < bytes) {
+			socket_blocking(reader->socket, 1);
+			while (reader->buffer.size - reader->buffer.cur_pos < bytes) {
+				read_from_socket(reader);
+				inflate_flush(reader);
+			}
+			socket_blocking(reader->socket, 0);
+		}
 	}
 }
 
@@ -441,10 +633,7 @@
 		reader->last_cycle = *cycle_out;
 		return reader->repeat_event;
 	}
-	if (reader->socket) {
-		read_from_socket(reader);
-		reader_ensure_data(reader, 1);
-	}
+	reader_ensure_data(reader, 1);
 	uint8_t header = load_int8(&reader->buffer);
 	uint8_t ret;
 	uint32_t delta;
--- a/event_log.h	Sat May 02 00:52:21 2020 -0700
+++ b/event_log.h	Sat May 02 17:33:23 2020 -0700
@@ -20,14 +20,18 @@
 };
 
 #include "serialize.h"
+#include "zlib/zlib.h"
 typedef struct {
 	size_t storage;
+	uint8_t *socket_buffer;
+	size_t socket_buffer_size;
 	int socket;
 	uint32_t last_cycle;
 	uint32_t last_word_address;
 	uint32_t last_byte_address;
 	uint32_t repeat_delta;
 	deserialize_buffer buffer;
+	z_stream input_stream;
 	uint8_t repeat_event;
 	uint8_t repeat_remaining;
 } event_reader;
--- a/gen_player.c	Sat May 02 00:52:21 2020 -0700
+++ b/gen_player.c	Sat May 02 17:33:23 2020 -0700
@@ -88,6 +88,9 @@
 		}
 			
 		}
+		if (!player->reader.socket) {
+			reader_ensure_data(&player->reader, 1);
+		}
 	}
 	
 }
@@ -147,6 +150,7 @@
 {
 	gen_player *player = calloc(1, sizeof(gen_player));
 	player->reader = *reader;
+	inflateCopy(&player->reader.input_stream, &reader->input_stream);
 	config_common(player);
 	return player;
 }