# HG changeset patch # User Mike Pavone # Date 1588466003 25200 # Node ID 9c01945b5d20d1b8d451a65be0992f2d8f20cc2a # Parent ba06346611a1c0b9848cf0366297fa8125eaebb1 Use zlib to compress event log streams diff -r ba06346611a1 -r 9c01945b5d20 blastem.c --- a/blastem.c Sat May 02 00:52:21 2020 -0700 +++ b/blastem.c Sat May 02 17:33:23 2020 -0700 @@ -598,7 +598,6 @@ } else if (!loaded) { reader_port = parse_addr_port(argv[i]); if (reader_port) { - //init_event_reader_tcp(&reader, argv[i], port); reader_addr = argv[i]; } else { if (!(cart.size = load_rom(argv[i], &cart.buffer, stype == SYSTEM_UNKNOWN ? &stype : NULL))) { @@ -722,6 +721,8 @@ fatal_error("Failed to detect system type for %s\n", romfname); } game_system = current_system = alloc_config_player(stype, &reader); + //free inflate stream as it was inflateCopied to an internal event reader in the player + inflateEnd(&reader.input_stream); setup_saves(&cart, current_system); update_title(current_system->info.name); } diff -r ba06346611a1 -r 9c01945b5d20 event_log.c --- a/event_log.c Sat May 02 00:52:21 2020 -0700 +++ b/event_log.c Sat May 02 17:33:23 2020 -0700 @@ -15,6 +15,7 @@ #include "util.h" #include "blastem.h" #include "saves.h" +#include "zlib/zlib.h" enum { CMD_GAMEPAD_DOWN, @@ -24,9 +25,47 @@ static uint8_t active, fully_active; static FILE *event_file; static serialize_buffer buffer; +static uint8_t *compressed; +static size_t compressed_storage; +static z_stream output_stream; +static uint32_t last; + +static void event_log_common_init(void) +{ + init_serialize(&buffer); + compressed_storage = 128*1024; + compressed = malloc(compressed_storage); + deflateInit(&output_stream, 9); + output_stream.avail_out = compressed_storage; + output_stream.next_out = compressed; + output_stream.avail_in = 0; + output_stream.next_in = buffer.data; + last = 0; + active = 1; +} + +static uint8_t multi_count; +static size_t multi_start; +static void finish_multi(void) +{ + buffer.data[multi_start] |= multi_count - 2; + multi_count = 0; +} + +static void file_finish(void) +{ + fwrite(compressed, 1, output_stream.next_out - compressed, event_file); + output_stream.next_out = compressed; + output_stream.avail_out = compressed_storage; + int result = deflate(&output_stream, Z_FINISH); + if (Z_STREAM_END != result) { + fatal_error("Final deflate call returned %d\n", result); + } + fwrite(compressed, 1, output_stream.next_out - compressed, event_file); + fclose(event_file); +} static const char el_ident[] = "BLSTEL\x02\x00"; -static uint32_t last; void event_log_file(char *fname) { event_file = fopen(fname, "wb"); @@ -35,9 +74,9 @@ return; } fwrite(el_ident, 1, sizeof(el_ident) - 1, event_file); - init_serialize(&buffer); - active = fully_active = 1; - last = 0; + event_log_common_init(); + fully_active = 1; + atexit(file_finish); } static int listen_sock, remotes[7]; @@ -70,7 +109,7 @@ goto cleanup_address; } socket_blocking(listen_sock, 0); - active = 1; + event_log_common_init(); cleanup_address: freeaddrinfo(result); } @@ -90,12 +129,15 @@ } save_int8(&buffer, name_len); save_buffer8(&buffer, name, strlen(name)); - if (!fully_active) { + if (listen_sock) { system_start = malloc(buffer.size); system_start_size = buffer.size; memcpy(system_start, buffer.data, buffer.size); - buffer.size = 0; + } else { + //system start header is never compressed, so write to file immediately + fwrite(buffer.data, 1, buffer.size, event_file); } + buffer.size = 0; } //header formats @@ -106,21 +148,17 @@ #define FORMAT_4BYTE 0xF0 static uint8_t last_event_type = 0xFF; static uint32_t last_delta; -static uint8_t multi_count; -static size_t multi_start; static void event_header(uint8_t type, uint32_t cycle) { uint32_t delta = cycle - last; if (multi_count) { if (type != last_event_type || delta != last_delta) { - buffer.data[multi_start] |= multi_count - 2; - multi_count = 0; + finish_multi(); } else { ++multi_count; if (multi_count == 17) { - buffer.data[multi_start] |= multi_count - 2; + finish_multi(); last_event_type = 0xFF; - multi_count = 0; } return; } @@ -159,7 +197,7 @@ save_int32(&buffer, deduction); } -static size_t remote_send_progress[7]; +static uint8_t *remote_send_progress[7]; static uint8_t remote_needs_state[7]; static void flush_socket(void) { @@ -174,11 +212,11 @@ current_system->save_state = EVENTLOG_SLOT + 1; } } - size_t min_progress = 0; + uint8_t *min_progress = compressed; for (int i = 0; i < num_remotes; i++) { int sent = 1; if (remote_needs_state[i]) { - remote_send_progress[i] = buffer.size; + remote_send_progress[i] = output_stream.next_out; } else { uint8_t buffer[1500]; int bytes = recv(remotes[i], buffer, sizeof(buffer), 0); @@ -210,9 +248,9 @@ } } } - while (sent && buffer.size - remote_send_progress[i]) + while (sent && output_stream.next_out > remote_send_progress[i]) { - sent = send(remotes[i], buffer.data + remote_send_progress[i], buffer.size - remote_send_progress[i], 0); + sent = send(remotes[i], remote_send_progress[i], output_stream.next_out - remote_send_progress[i], 0); if (sent >= 0) { remote_send_progress[i] += sent; } else if (socket_error_is_wouldblock()) { @@ -229,11 +267,12 @@ } } } - if (min_progress == buffer.size) { - buffer.size = 0; - memset(remote_send_progress, 0, sizeof(remote_send_progress)); - multi_count = 0; - last_event_type = 0xFF; + if (min_progress == output_stream.next_out) { + output_stream.next_out = compressed; + output_stream.avail_out = compressed_storage; + for (int i = 0; i < num_remotes; i++) { + remote_send_progress[i] = compressed; + } } } @@ -245,13 +284,26 @@ event_header(type, cycle); last = cycle; save_buffer8(&buffer, payload, size); - if (listen_sock && buffer.size > 1280) { - if (multi_count) { - buffer.data[multi_start] |= multi_count - 2; - multi_count = 0; - last_event_type = 0xFF; + if (!multi_count) { + last_event_type = 0xFF; + output_stream.avail_in = buffer.size - (output_stream.next_in - buffer.data); + int result = deflate(&output_stream, Z_NO_FLUSH); + if (result != Z_OK) { + fatal_error("deflate returned %d\n", result); } - flush_socket(); + if (listen_sock) { + if ((output_stream.next_out - compressed) > 1280 || !output_stream.avail_out) { + flush_socket(); + } + } else if (!output_stream.avail_out) { + fwrite(compressed, 1, compressed_storage, event_file); + output_stream.next_out = compressed; + output_stream.avail_out = compressed_storage; + } + if (!output_stream.avail_in) { + buffer.size = 0; + output_stream.next_in = buffer.data; + } } } @@ -300,6 +352,39 @@ return total; } +void deflate_flush(uint8_t full) +{ + output_stream.avail_in = buffer.size - (output_stream.next_in - buffer.data); + while (output_stream.avail_in) + { + if (!output_stream.avail_out) { + size_t old_storage = compressed_storage; + uint8_t *old_compressed = compressed; + compressed_storage *= 2; + compressed = realloc(compressed, compressed_storage); + output_stream.next_out = compressed + old_storage; + output_stream.avail_out = old_storage; + for (int i = 0; i < num_remotes; i++) { + if (!remote_needs_state[i]) { + remote_send_progress[i] = compressed + (remote_send_progress[i] - old_compressed); + } + } + } + int result = deflate(&output_stream, full ? Z_FINISH : Z_SYNC_FLUSH); + if (result != (full ? Z_STREAM_END : Z_OK)) { + fatal_error("deflate returned %d\n", result); + } + if (full) { + result = deflateReset(&output_stream); + if (result != Z_OK) { + fatal_error("deflateReset returned %d\n", result); + } + } + } + output_stream.next_in = buffer.data; + buffer.size = 0; +} + void event_state(uint32_t cycle, serialize_buffer *state) { if (!fully_active) { @@ -311,20 +396,12 @@ last_byte_address >> 8, last_byte_address, state->size >> 16, state->size >> 8, state->size }; + uint8_t sent_system_start = 0; for (int i = 0; i < num_remotes; i++) { if (remote_needs_state[i]) { - if( - send_all(remotes[i], system_start, system_start_size, 0) == system_start_size - && send_all(remotes[i], header, sizeof(header), 0) == sizeof(header) - && send_all(remotes[i], state->data, state->size, 0) == state->size - ) { - remote_send_progress[i] = buffer.size; - remote_needs_state[i] = 0; - socket_blocking(remotes[i], 0); - int flag = 1; - setsockopt(remotes[i], IPPROTO_TCP, TCP_NODELAY, (const char *)&flag, sizeof(flag)); - fully_active = 1; + if (send_all(remotes[i], system_start, system_start_size, 0) == system_start_size) { + sent_system_start = 1; } else { socket_close(remotes[i]); remotes[i] = remotes[num_remotes-1]; @@ -335,6 +412,41 @@ } } } + if (sent_system_start) { + if (fully_active) { + if (multi_count) { + finish_multi(); + } + //full flush is needed so new and old clients can share a stream + deflate_flush(1); + } + save_buffer8(&buffer, header, sizeof(header)); + save_buffer8(&buffer, state->data, state->size); + size_t old_compressed_size = output_stream.next_out - compressed; + deflate_flush(1); + size_t state_size = output_stream.next_out - compressed - old_compressed_size; + for (int i = 0; i < num_remotes; i++) { + if (remote_needs_state[i]) { + if (send_all(remotes[i], compressed + old_compressed_size, state_size, 0) == state_size) { + remote_send_progress[i] = compressed + old_compressed_size; + remote_needs_state[i] = 0; + socket_blocking(remotes[i], 0); + int flag = 1; + setsockopt(remotes[i], IPPROTO_TCP, TCP_NODELAY, (const char *)&flag, sizeof(flag)); + fully_active = 1; + } else { + socket_close(remotes[i]); + remotes[i] = remotes[num_remotes-1]; + remote_send_progress[i] = remote_send_progress[num_remotes-1]; + remote_needs_state[i] = remote_needs_state[num_remotes-1]; + num_remotes--; + i--; + } + } + } + output_stream.next_out = compressed + old_compressed_size; + output_stream.avail_out = compressed_storage - old_compressed_size; + } } void event_flush(uint32_t cycle) @@ -343,25 +455,55 @@ return; } if (fully_active) { - event_log(EVENT_FLUSH, cycle, 0, NULL); + event_header(EVENT_FLUSH, cycle); + last = cycle; + + deflate_flush(0); } if (event_file) { - fwrite(buffer.data, 1, buffer.size, event_file); + fwrite(compressed, 1, output_stream.next_out - compressed, event_file); fflush(event_file); - buffer.size = 0; - multi_count = 0; - last_event_type = 0xFF; + output_stream.next_out = compressed; + output_stream.avail_out = compressed_storage; } else if (listen_sock) { flush_socket(); } } +static void init_event_reader_common(event_reader *reader) +{ + reader->last_cycle = 0; + reader->repeat_event = 0xFF; + reader->storage = 512 * 1024; + init_deserialize(&reader->buffer, malloc(reader->storage), reader->storage); + reader->buffer.size = 0; + memset(&reader->input_stream, 0, sizeof(reader->input_stream)); + +} + void init_event_reader(event_reader *reader, uint8_t *data, size_t size) { reader->socket = 0; reader->last_cycle = 0; reader->repeat_event = 0xFF; - init_deserialize(&reader->buffer, data, size); + init_event_reader_common(reader); + uint8_t name_len = data[1]; + reader->buffer.size = name_len + 2; + memcpy(reader->buffer.data, data, reader->buffer.size); + reader->input_stream.next_in = data + reader->buffer.size; + reader->input_stream.avail_in = size - reader->buffer.size; + + int result = inflateInit(&reader->input_stream); + if (Z_OK != result) { + fatal_error("inflateInit returned %d\n", result); + } + reader->input_stream.next_out = reader->buffer.data + reader->buffer.size; + reader->input_stream.avail_out = reader->storage - reader->buffer.size; + result = inflate(&reader->input_stream, Z_NO_FLUSH); + if (Z_OK != result && Z_STREAM_END != result) { + fatal_error("inflate returned %d\n", result); + } + reader->buffer.size = reader->input_stream.next_out - reader->buffer.data; } void init_event_reader_tcp(event_reader *reader, char *address, char *port) @@ -382,10 +524,10 @@ fatal_error("Failed to connect to %s:%s for event log stream\n", address, port); } - reader->storage = 512 * 1024; - reader->last_cycle = 0; - init_deserialize(&reader->buffer, malloc(reader->storage), reader->storage); - reader->buffer.size = 0; + init_event_reader_common(reader); + reader->socket_buffer_size = 256 * 1024; + reader->socket_buffer = malloc(reader->socket_buffer_size); + while(reader->buffer.size < 3 || reader->buffer.size < 3 + reader->buffer.data[2]) { int bytes = recv(reader->socket, reader->buffer.data + reader->buffer.size, reader->storage - reader->buffer.size, 0); @@ -394,6 +536,21 @@ } reader->buffer.size += bytes; } + size_t init_msg_len = 3 + reader->buffer.data[2]; + memcpy(reader->socket_buffer, reader->buffer.data + init_msg_len, reader->buffer.size - init_msg_len); + reader->input_stream.next_in = reader->socket_buffer; + reader->input_stream.avail_in = reader->buffer.size - init_msg_len; + reader->buffer.size = init_msg_len; + int res = inflateInit(&reader->input_stream); + if (Z_OK != res) { + fatal_error("inflateInit returned %d\n", res); + } + reader->input_stream.next_out = reader->buffer.data + init_msg_len; + reader->input_stream.avail_out = reader->storage - init_msg_len; + res = inflate(&reader->input_stream, Z_NO_FLUSH); + if (Z_OK != res && Z_BUF_ERROR != res) { + fatal_error("inflate returned %d in init_event_reader_tcp\n", res); + } socket_blocking(reader->socket, 0); int flag = 1; setsockopt(reader->socket, IPPROTO_TCP, TCP_NODELAY, (const char *)&flag, sizeof(flag)); @@ -401,35 +558,70 @@ static void read_from_socket(event_reader *reader) { - if (reader->storage - (reader->buffer.size - reader->buffer.cur_pos) < 128 * 1024) { - reader->storage *= 2; - uint8_t *new_buf = malloc(reader->storage); - memcpy(new_buf, reader->buffer.data + reader->buffer.cur_pos, reader->buffer.size - reader->buffer.cur_pos); - free(reader->buffer.data); - reader->buffer.data = new_buf; - reader->buffer.size -= reader->buffer.cur_pos; - reader->buffer.cur_pos = 0; - } else if (reader->buffer.cur_pos >= reader->buffer.size/2 && reader->buffer.size >= reader->storage/2) { - memmove(reader->buffer.data, reader->buffer.data + reader->buffer.cur_pos, reader->buffer.size - reader->buffer.cur_pos); - reader->buffer.size -= reader->buffer.cur_pos; - reader->buffer.cur_pos = 0; + if (reader->socket_buffer_size - reader->input_stream.avail_in < 128 * 1024) { + reader->socket_buffer_size *= 2; + uint8_t *new_buf = malloc(reader->socket_buffer_size); + memcpy(new_buf, reader->input_stream.next_in, reader->input_stream.avail_in); + free(reader->socket_buffer); + reader->socket_buffer = new_buf; + reader->input_stream.next_in = new_buf; + } else if ( + reader->input_stream.next_in - reader->socket_buffer >= reader->input_stream.avail_in + && reader->input_stream.next_in - reader->socket_buffer + reader->input_stream.avail_in >= reader->socket_buffer_size/2 + ) { + memmove(reader->socket_buffer, reader->input_stream.next_in, reader->input_stream.avail_in); + reader->input_stream.next_in = reader->socket_buffer; } - int bytes = recv(reader->socket, reader->buffer.data + reader->buffer.size, reader->storage - reader->buffer.size, 0); + uint8_t *space_start = reader->input_stream.next_in + reader->input_stream.avail_in; + size_t space = (reader->socket_buffer + reader->socket_buffer_size) - space_start; + int bytes = recv(reader->socket, space_start, space, 0); if (bytes >= 0) { - reader->buffer.size += bytes; + reader->input_stream.avail_in += bytes; } else if (!socket_error_is_wouldblock()) { fatal_error("Connection closed, error = %X\n", socket_last_error()); } } +static void inflate_flush(event_reader *reader) +{ + if (reader->buffer.cur_pos > reader->storage / 2) { + memmove(reader->buffer.data, reader->buffer.data + reader->buffer.cur_pos, reader->buffer.size - reader->buffer.cur_pos); + reader->buffer.size -= reader->buffer.cur_pos; + reader->buffer.cur_pos = 0; + reader->input_stream.next_out = reader->buffer.data + reader->buffer.size; + reader->input_stream.avail_out = reader->storage - reader->buffer.size; + } + int result = inflate(&reader->input_stream, Z_SYNC_FLUSH); + if (Z_OK != result && Z_STREAM_END != result) { + fatal_error("inflate returned %d\n", result); + } + reader->buffer.size = reader->input_stream.next_out - reader->buffer.data; + if (result == Z_STREAM_END && (reader->socket || reader->input_stream.avail_in)) { + inflateReset(&reader->input_stream); + if (reader->input_stream.avail_in) { + inflate_flush(reader); + } + } + +} + void reader_ensure_data(event_reader *reader, size_t bytes) { - if (reader->socket && reader->buffer.size - reader->buffer.cur_pos < bytes) { - socket_blocking(reader->socket, 1); - while (reader->buffer.size - reader->buffer.cur_pos < bytes) { + if (reader->buffer.size - reader->buffer.cur_pos < bytes) { + if (reader->socket) { read_from_socket(reader); } - socket_blocking(reader->socket, 0); + if (reader->input_stream.avail_in) { + inflate_flush(reader); + } + if (reader->socket && reader->buffer.size - reader->buffer.cur_pos < bytes) { + socket_blocking(reader->socket, 1); + while (reader->buffer.size - reader->buffer.cur_pos < bytes) { + read_from_socket(reader); + inflate_flush(reader); + } + socket_blocking(reader->socket, 0); + } } } @@ -441,10 +633,7 @@ reader->last_cycle = *cycle_out; return reader->repeat_event; } - if (reader->socket) { - read_from_socket(reader); - reader_ensure_data(reader, 1); - } + reader_ensure_data(reader, 1); uint8_t header = load_int8(&reader->buffer); uint8_t ret; uint32_t delta; diff -r ba06346611a1 -r 9c01945b5d20 event_log.h --- a/event_log.h Sat May 02 00:52:21 2020 -0700 +++ b/event_log.h Sat May 02 17:33:23 2020 -0700 @@ -20,14 +20,18 @@ }; #include "serialize.h" +#include "zlib/zlib.h" typedef struct { size_t storage; + uint8_t *socket_buffer; + size_t socket_buffer_size; int socket; uint32_t last_cycle; uint32_t last_word_address; uint32_t last_byte_address; uint32_t repeat_delta; deserialize_buffer buffer; + z_stream input_stream; uint8_t repeat_event; uint8_t repeat_remaining; } event_reader; diff -r ba06346611a1 -r 9c01945b5d20 gen_player.c --- a/gen_player.c Sat May 02 00:52:21 2020 -0700 +++ b/gen_player.c Sat May 02 17:33:23 2020 -0700 @@ -88,6 +88,9 @@ } } + if (!player->reader.socket) { + reader_ensure_data(&player->reader, 1); + } } } @@ -147,6 +150,7 @@ { gen_player *player = calloc(1, sizeof(gen_player)); player->reader = *reader; + inflateCopy(&player->reader.input_stream, &reader->input_stream); config_common(player); return player; }