Mercurial > repos > blastem
view event_log.c @ 2321:2eda5f81f91e
More fully baked ROM db support for SMS
author | Michael Pavone <pavone@retrodev.com> |
---|---|
date | Thu, 15 Jun 2023 09:36:11 -0700 |
parents | a042e046f7f2 |
children |
line wrap: on
line source
#ifdef _WIN32 #define WINVER 0x501 #include <winsock2.h> #include <ws2tcpip.h> #else #include <sys/types.h> #include <sys/socket.h> #include <unistd.h> #include <netdb.h> #include <netinet/tcp.h> #endif #include <stdlib.h> #include <string.h> #include <errno.h> #include "event_log.h" #include "util.h" #include "blastem.h" #include "saves.h" #include "zlib/zlib.h" enum { CMD_GAMEPAD_DOWN, CMD_GAMEPAD_UP, }; static uint8_t active, fully_active; static FILE *event_file; static serialize_buffer buffer; static uint8_t *compressed; static size_t compressed_storage; static z_stream output_stream; static uint32_t last; static void event_log_common_init(void) { init_serialize(&buffer); compressed_storage = 128*1024; compressed = malloc(compressed_storage); deflateInit(&output_stream, 9); output_stream.avail_out = compressed_storage; output_stream.next_out = compressed; output_stream.avail_in = 0; output_stream.next_in = buffer.data; last = 0; active = 1; } static uint8_t multi_count; static size_t multi_start; static void finish_multi(void) { buffer.data[multi_start] |= multi_count - 2; multi_count = 0; } static void file_finish(void) { fwrite(compressed, 1, output_stream.next_out - compressed, event_file); output_stream.next_out = compressed; output_stream.avail_out = compressed_storage; int result = deflate(&output_stream, Z_FINISH); if (Z_STREAM_END != result) { fatal_error("Final deflate call returned %d\n", result); } fwrite(compressed, 1, output_stream.next_out - compressed, event_file); fclose(event_file); } static const char el_ident[] = "BLSTEL\x02\x00"; void event_log_file(char *fname) { event_file = fopen(fname, "wb"); if (!event_file) { warning("Failed to open event file %s for writing\n", fname); return; } fwrite(el_ident, 1, sizeof(el_ident) - 1, event_file); event_log_common_init(); fully_active = 1; atexit(file_finish); } typedef struct { uint8_t *send_progress; int sock; uint8_t players[1]; //TODO: Expand when support for multiple players per remote is added uint8_t num_players; } remote; static int listen_sock; static remote remotes[7]; static int num_remotes; static uint8_t available_players[7] = {2,3,4,5,6,7,8}; static int num_available_players = 7; void event_log_tcp(char *address, char *port) { struct addrinfo request, *result; socket_init(); memset(&request, 0, sizeof(request)); request.ai_family = AF_INET; request.ai_socktype = SOCK_STREAM; request.ai_flags = AI_PASSIVE; getaddrinfo(address, port, &request, &result); listen_sock = socket(result->ai_family, result->ai_socktype, result->ai_protocol); if (listen_sock < 0) { warning("Failed to open event log listen socket on %s:%s\n", address, port); goto cleanup_address; } int param = 1; setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, (const char *)¶m, sizeof(param)); if (bind(listen_sock, result->ai_addr, result->ai_addrlen) < 0) { warning("Failed to bind event log listen socket on %s:%s\n", address, port); socket_close(listen_sock); goto cleanup_address; } if (listen(listen_sock, 3) < 0) { warning("Failed to listen for event log remotes on %s:%s\n", address, port); socket_close(listen_sock); goto cleanup_address; } socket_blocking(listen_sock, 0); event_log_common_init(); cleanup_address: freeaddrinfo(result); } static uint8_t *system_start; static size_t system_start_size; void event_system_start(system_type stype, vid_std video_std, char *name) { if (!active) { return; } save_int8(&buffer, stype); save_int8(&buffer, video_std); size_t name_len = strlen(name); if (name_len > 255) { name_len = 255; } save_int8(&buffer, name_len); save_buffer8(&buffer, name, strlen(name)); if (listen_sock) { system_start = malloc(buffer.size); system_start_size = buffer.size; memcpy(system_start, buffer.data, buffer.size); } else { //system start header is never compressed, so write to file immediately fwrite(buffer.data, 1, buffer.size, event_file); } buffer.size = 0; } //header formats //Single byte: 4 bit type, 4 bit delta (16-31) //Three Byte: 8 bit type, 16-bit delta //Four byte: 8-bit type, 24-bit signed delta #define FORMAT_3BYTE 0xE0 #define FORMAT_4BYTE 0xF0 static uint8_t last_event_type = 0xFF; static uint32_t last_delta; static void event_header(uint8_t type, uint32_t cycle) { uint32_t delta = cycle - last; if (multi_count) { if (type != last_event_type || delta != last_delta) { finish_multi(); } else { ++multi_count; if (multi_count == 17) { finish_multi(); last_event_type = 0xFF; } return; } } else if (type == last_event_type && delta == last_delta && type != EVENT_FLUSH) { //make some room save_int8(&buffer, 0); //shift existing command memmove(buffer.data + multi_start + 1, buffer.data + multi_start, buffer.size - multi_start - 1); buffer.data[multi_start] = EVENT_MULTI << 4; multi_count = 2; return; } multi_start = buffer.size; last_event_type = type; last_delta = delta; if (delta > 65535) { save_int8(&buffer, FORMAT_4BYTE | type); save_int8(&buffer, delta >> 16); save_int16(&buffer, delta); } else if (delta >= 16 && delta < 32) { save_int8(&buffer, type << 4 | (delta - 16)); } else { save_int8(&buffer, FORMAT_3BYTE | type); save_int16(&buffer, delta); } } void event_cycle_adjust(uint32_t cycle, uint32_t deduction) { if (!fully_active) { return; } event_header(EVENT_ADJUST, cycle); last = cycle - deduction; save_int32(&buffer, deduction); } static uint8_t next_available_player(void) { uint8_t lowest = 0xFF; int lowest_index = -1; for (int i = 0; i < num_available_players; i++) { if (available_players[i] < lowest) { lowest = available_players[i]; lowest_index = i; } } if (lowest_index >= 0) { available_players[lowest_index] = available_players[num_available_players - 1]; --num_available_players; } return lowest; } static void flush_socket(void) { int remote_sock = accept(listen_sock, NULL, NULL); if (remote_sock != -1) { if (num_remotes == 7) { socket_close(remote_sock); } else { printf("remote %d connected\n", num_remotes); uint8_t player = next_available_player(); remotes[num_remotes++] = (remote){ .sock = remote_sock, .send_progress = NULL, .players = {player}, .num_players = player == 0xFF ? 0 : 1 }; current_system->save_state = EVENTLOG_SLOT + 1; } } uint8_t *min_progress = compressed; for (int i = 0; i < num_remotes; i++) { if (remotes[i].send_progress) { uint8_t recv_buffer[1500]; int bytes = recv(remotes[i].sock, recv_buffer, sizeof(recv_buffer), 0); for (int j = 0; j < bytes; j++) { uint8_t cmd = recv_buffer[j]; switch(cmd) { case CMD_GAMEPAD_DOWN: case CMD_GAMEPAD_UP: { ++j; if (j < bytes) { uint8_t button = recv_buffer[j]; uint8_t pad = (button >> 5) - 1; button &= 0x1F; if (pad < remotes[i].num_players) { pad = remotes[i].players[pad]; if (cmd == CMD_GAMEPAD_DOWN) { current_system->gamepad_down(current_system, pad, button); } else { current_system->gamepad_up(current_system, pad, button); } } } else { warning("Received incomplete command %X\n", cmd); } break; } default: warning("Unrecognized remote command %X\n", cmd); j = bytes; } } int sent = 1; while (sent && output_stream.next_out > remotes[i].send_progress) { sent = send(remotes[i].sock, remotes[i].send_progress, output_stream.next_out - remotes[i].send_progress, 0); if (sent >= 0) { remotes[i].send_progress += sent; } else if (!socket_error_is_wouldblock()) { socket_close(remotes[i].sock); for (int j = 0; j < remotes[i].num_players; j++) { available_players[num_available_players++] = remotes[i].players[j]; } remotes[i] = remotes[num_remotes-1]; num_remotes--; if (!num_remotes) { //last remote disconnected, reset buffers/deflate fully_active = 0; deflateReset(&output_stream); output_stream.next_out = compressed; output_stream.avail_out = compressed_storage; buffer.size = 0; } i--; break; } if (remotes[i].send_progress > min_progress) { min_progress = remotes[i].send_progress; } } } } if (min_progress == output_stream.next_out) { output_stream.next_out = compressed; output_stream.avail_out = compressed_storage; for (int i = 0; i < num_remotes; i++) { if (remotes[i].send_progress) { remotes[i].send_progress = compressed; } } } } uint8_t wrote_since_last_flush; void event_log(uint8_t type, uint32_t cycle, uint8_t size, uint8_t *payload) { if (!fully_active) { return; } event_header(type, cycle); last = cycle; save_buffer8(&buffer, payload, size); if (!multi_count) { last_event_type = 0xFF; output_stream.avail_in = buffer.size - (output_stream.next_in - buffer.data); int result = deflate(&output_stream, Z_NO_FLUSH); if (result != Z_OK) { fatal_error("deflate returned %d\n", result); } if (listen_sock) { if ((output_stream.next_out - compressed) > 1280 || !output_stream.avail_out) { flush_socket(); wrote_since_last_flush = 1; } } else if (!output_stream.avail_out) { fwrite(compressed, 1, compressed_storage, event_file); output_stream.next_out = compressed; output_stream.avail_out = compressed_storage; } if (!output_stream.avail_in) { buffer.size = 0; output_stream.next_in = buffer.data; } } } static uint32_t last_word_address; void event_vram_word(uint32_t cycle, uint32_t address, uint16_t value) { uint32_t delta = address - last_word_address; if (delta < 256) { uint8_t buffer[3] = {delta, value >> 8, value}; event_log(EVENT_VRAM_WORD_DELTA, cycle, sizeof(buffer), buffer); } else { uint8_t buffer[5] = {address >> 16, address >> 8, address, value >> 8, value}; event_log(EVENT_VRAM_WORD, cycle, sizeof(buffer), buffer); } last_word_address = address; } static uint32_t last_byte_address; void event_vram_byte(uint32_t cycle, uint16_t address, uint8_t byte, uint8_t auto_inc) { uint32_t delta = address - last_byte_address; if (delta == 1) { event_log(EVENT_VRAM_BYTE_ONE, cycle, sizeof(byte), &byte); } else if (delta == auto_inc) { event_log(EVENT_VRAM_BYTE_AUTO, cycle, sizeof(byte), &byte); } else if (delta < 256) { uint8_t buffer[2] = {delta, byte}; event_log(EVENT_VRAM_BYTE_DELTA, cycle, sizeof(buffer), buffer); } else { uint8_t buffer[3] = {address >> 8, address, byte}; event_log(EVENT_VRAM_BYTE, cycle, sizeof(buffer), buffer); } last_byte_address = address; } static size_t send_all(int sock, uint8_t *data, size_t size, int flags) { size_t total = 0, sent = 1; while(sent > 0 && total < size) { sent = send(sock, data + total, size - total, flags); if (sent > 0) { total += sent; } } return total; } void deflate_flush(uint8_t full) { output_stream.avail_in = buffer.size - (output_stream.next_in - buffer.data); uint8_t force = full; while (output_stream.avail_in || force) { if (!output_stream.avail_out) { size_t old_storage = compressed_storage; uint8_t *old_compressed = compressed; compressed_storage *= 2; compressed = realloc(compressed, compressed_storage); output_stream.next_out = compressed + old_storage; output_stream.avail_out = old_storage; for (int i = 0; i < num_remotes; i++) { if (remotes[i].send_progress) { remotes[i].send_progress = compressed + (remotes[i].send_progress - old_compressed); } } } int result = deflate(&output_stream, full ? Z_FINISH : Z_SYNC_FLUSH); if (result != (full ? Z_STREAM_END : Z_OK)) { fatal_error("deflate returned %d\n", result); } if (full && result == Z_STREAM_END) { result = deflateReset(&output_stream); if (result != Z_OK) { fatal_error("deflateReset returned %d\n", result); } } force = 0; } output_stream.next_in = buffer.data; buffer.size = 0; } void event_state(uint32_t cycle, serialize_buffer *state) { if (!fully_active) { last = cycle; } uint8_t header[] = { EVENT_STATE << 4, last >> 24, last >> 16, last >> 8, last, last_word_address >> 16, last_word_address >> 8, last_word_address, last_byte_address >> 8, last_byte_address, state->size >> 16, state->size >> 8, state->size }; uint8_t sent_system_start = 0; for (int i = 0; i < num_remotes; i++) { if (!remotes[i].send_progress) { if (send_all(remotes[i].sock, system_start, system_start_size, 0) == system_start_size) { sent_system_start = 1; } else { socket_close(remotes[i].sock); remotes[i] = remotes[num_remotes-1]; num_remotes--; i--; } } } if (sent_system_start) { if (fully_active) { if (multi_count) { finish_multi(); } //full flush is needed so new and old clients can share a stream deflate_flush(1); } save_buffer8(&buffer, header, sizeof(header)); save_buffer8(&buffer, state->data, state->size); size_t old_compressed_size = output_stream.next_out - compressed; deflate_flush(1); size_t state_size = output_stream.next_out - compressed - old_compressed_size; for (int i = 0; i < num_remotes; i++) { if (!remotes[i].send_progress) { if (send_all(remotes[i].sock, compressed + old_compressed_size, state_size, 0) == state_size) { remotes[i].send_progress = compressed + old_compressed_size; socket_blocking(remotes[i].sock, 0); int flag = 1; setsockopt(remotes[i].sock, IPPROTO_TCP, TCP_NODELAY, (const char *)&flag, sizeof(flag)); fully_active = 1; } else { socket_close(remotes[i].sock); remotes[i] = remotes[num_remotes-1]; num_remotes--; i--; } } } output_stream.next_out = compressed + old_compressed_size; output_stream.avail_out = compressed_storage - old_compressed_size; } } void event_flush(uint32_t cycle) { if (!active) { return; } if (fully_active) { event_header(EVENT_FLUSH, cycle); last = cycle; deflate_flush(0); } if (event_file) { fwrite(compressed, 1, output_stream.next_out - compressed, event_file); fflush(event_file); output_stream.next_out = compressed; output_stream.avail_out = compressed_storage; } else if (listen_sock) { flush_socket(); wrote_since_last_flush = 0; } } void event_soft_flush(uint32_t cycle) { if (!fully_active || wrote_since_last_flush || event_file) { return; } event_header(EVENT_FLUSH, cycle); last = cycle; deflate_flush(0); flush_socket(); } static void init_event_reader_common(event_reader *reader) { reader->last_cycle = 0; reader->repeat_event = 0xFF; reader->storage = 512 * 1024; init_deserialize(&reader->buffer, malloc(reader->storage), reader->storage); reader->buffer.size = 0; memset(&reader->input_stream, 0, sizeof(reader->input_stream)); } void init_event_reader(event_reader *reader, uint8_t *data, size_t size) { reader->socket = 0; reader->last_cycle = 0; reader->repeat_event = 0xFF; init_event_reader_common(reader); uint8_t name_len = data[1]; reader->buffer.size = name_len + 2; memcpy(reader->buffer.data, data, reader->buffer.size); reader->input_stream.next_in = data + reader->buffer.size; reader->input_stream.avail_in = size - reader->buffer.size; int result = inflateInit(&reader->input_stream); if (Z_OK != result) { fatal_error("inflateInit returned %d\n", result); } reader->input_stream.next_out = reader->buffer.data + reader->buffer.size; reader->input_stream.avail_out = reader->storage - reader->buffer.size; result = inflate(&reader->input_stream, Z_NO_FLUSH); if (Z_OK != result && Z_STREAM_END != result) { fatal_error("inflate returned %d\n", result); } reader->buffer.size = reader->input_stream.next_out - reader->buffer.data; } void init_event_reader_tcp(event_reader *reader, char *address, char *port) { struct addrinfo request, *result; socket_init(); memset(&request, 0, sizeof(request)); request.ai_family = AF_INET; request.ai_socktype = SOCK_STREAM; request.ai_flags = AI_PASSIVE; getaddrinfo(address, port, &request, &result); reader->socket = socket(result->ai_family, result->ai_socktype, result->ai_protocol); if (reader->socket < 0) { fatal_error("Failed to create socket for event log connection to %s:%s\n", address, port); } if (connect(reader->socket, result->ai_addr, result->ai_addrlen) < 0) { fatal_error("Failed to connect to %s:%s for event log stream\n", address, port); } init_event_reader_common(reader); reader->socket_buffer_size = 256 * 1024; reader->socket_buffer = malloc(reader->socket_buffer_size); while(reader->buffer.size < 3 || reader->buffer.size < 3 + reader->buffer.data[2]) { int bytes = recv(reader->socket, reader->buffer.data + reader->buffer.size, reader->storage - reader->buffer.size, 0); if (bytes < 0) { fatal_error("Failed to receive system init from %s:%s\n", address, port); } reader->buffer.size += bytes; } size_t init_msg_len = 3 + reader->buffer.data[2]; memcpy(reader->socket_buffer, reader->buffer.data + init_msg_len, reader->buffer.size - init_msg_len); reader->input_stream.next_in = reader->socket_buffer; reader->input_stream.avail_in = reader->buffer.size - init_msg_len; reader->buffer.size = init_msg_len; int res = inflateInit(&reader->input_stream); if (Z_OK != res) { fatal_error("inflateInit returned %d\n", res); } reader->input_stream.next_out = reader->buffer.data + init_msg_len; reader->input_stream.avail_out = reader->storage - init_msg_len; res = inflate(&reader->input_stream, Z_NO_FLUSH); if (Z_OK != res && Z_BUF_ERROR != res) { fatal_error("inflate returned %d in init_event_reader_tcp\n", res); } int flag = 1; setsockopt(reader->socket, IPPROTO_TCP, TCP_NODELAY, (const char *)&flag, sizeof(flag)); } static void read_from_socket(event_reader *reader) { if (reader->socket_buffer_size - reader->input_stream.avail_in < 128 * 1024) { reader->socket_buffer_size *= 2; uint8_t *new_buf = malloc(reader->socket_buffer_size); memcpy(new_buf, reader->input_stream.next_in, reader->input_stream.avail_in); free(reader->socket_buffer); reader->socket_buffer = new_buf; reader->input_stream.next_in = new_buf; } else if ( reader->input_stream.next_in - reader->socket_buffer >= reader->input_stream.avail_in && reader->input_stream.next_in - reader->socket_buffer + reader->input_stream.avail_in >= reader->socket_buffer_size/2 ) { memmove(reader->socket_buffer, reader->input_stream.next_in, reader->input_stream.avail_in); reader->input_stream.next_in = reader->socket_buffer; } uint8_t *space_start = reader->input_stream.next_in + reader->input_stream.avail_in; size_t space = (reader->socket_buffer + reader->socket_buffer_size) - space_start; int bytes = recv(reader->socket, space_start, space, 0); if (bytes >= 0) { reader->input_stream.avail_in += bytes; } else if (!socket_error_is_wouldblock()) { fatal_error("Connection closed, error = %X\n", socket_last_error()); } } static void inflate_flush(event_reader *reader) { if (reader->buffer.cur_pos > reader->storage / 2) { memmove(reader->buffer.data, reader->buffer.data + reader->buffer.cur_pos, reader->buffer.size - reader->buffer.cur_pos); reader->buffer.size -= reader->buffer.cur_pos; reader->buffer.cur_pos = 0; reader->input_stream.next_out = reader->buffer.data + reader->buffer.size; reader->input_stream.avail_out = reader->storage - reader->buffer.size; } int result = inflate(&reader->input_stream, Z_SYNC_FLUSH); if (Z_OK != result && Z_STREAM_END != result) { fatal_error("inflate returned %d\n", result); } reader->buffer.size = reader->input_stream.next_out - reader->buffer.data; if (result == Z_STREAM_END && (reader->socket || reader->input_stream.avail_in)) { inflateReset(&reader->input_stream); if (reader->input_stream.avail_in) { inflate_flush(reader); } } } void reader_ensure_data(event_reader *reader, size_t bytes) { if (reader->buffer.size - reader->buffer.cur_pos < bytes) { if (reader->input_stream.avail_in) { inflate_flush(reader); } if (reader->socket) { while (reader->buffer.size - reader->buffer.cur_pos < bytes) { read_from_socket(reader); inflate_flush(reader); } } } } uint8_t reader_next_event(event_reader *reader, uint32_t *cycle_out) { if (reader->repeat_remaining) { reader->repeat_remaining--; *cycle_out = reader->last_cycle + reader->repeat_delta; reader->last_cycle = *cycle_out; return reader->repeat_event; } reader_ensure_data(reader, 1); uint8_t header = load_int8(&reader->buffer); uint8_t ret; uint32_t delta; uint8_t multi_start = 0; if ((header & 0xF0) == (EVENT_MULTI << 4)) { reader->repeat_remaining = (header & 0xF) + 1; multi_start = 1; reader_ensure_data(reader, 1); header = load_int8(&reader->buffer); } if ((header & 0xF0) < FORMAT_3BYTE) { delta = (header & 0xF) + 16; ret = header >> 4; } else if ((header & 0xF0) == FORMAT_3BYTE) { reader_ensure_data(reader, 2); delta = load_int16(&reader->buffer); ret = header & 0xF; } else { reader_ensure_data(reader, 3); delta = load_int8(&reader->buffer) << 16; //sign extend 24-bit delta to 32-bit if (delta & 0x800000) { delta |= 0xFF000000; } delta |= load_int16(&reader->buffer); ret = header & 0xF; } if (multi_start) { reader->repeat_event = ret; reader->repeat_delta = delta; } *cycle_out = reader->last_cycle + delta; reader->last_cycle = *cycle_out; if (ret == EVENT_ADJUST) { reader_ensure_data(reader, 4); size_t old_pos = reader->buffer.cur_pos; uint32_t adjust = load_int32(&reader->buffer); reader->buffer.cur_pos = old_pos; reader->last_cycle -= adjust; } else if (ret == EVENT_STATE) { reader_ensure_data(reader, 8); reader->last_cycle = load_int32(&reader->buffer); reader->last_word_address = load_int8(&reader->buffer) << 16; reader->last_word_address |= load_int16(&reader->buffer); reader->last_byte_address = load_int16(&reader->buffer); } return ret; } uint8_t reader_system_type(event_reader *reader) { return load_int8(&reader->buffer); } void reader_send_gamepad_event(event_reader *reader, uint8_t pad, uint8_t button, uint8_t down) { uint8_t buffer[] = {down ? CMD_GAMEPAD_DOWN : CMD_GAMEPAD_UP, pad << 5 | button}; //TODO: Deal with the fact that we're not in blocking mode so this may not actually send all //if the buffer is full send_all(reader->socket, buffer, sizeof(buffer), 0); }