comparison event_log.c @ 1947:c36102d09351

Add missing netplay files and add in support for sending gamepad commands back to host
author Michael Pavone <pavone@retrodev.com>
date Wed, 29 Apr 2020 23:42:16 -0700
parents
children d01527615c7c
comparison
equal deleted inserted replaced
1946:c3c62dbf1ceb 1947:c36102d09351
1 #ifdef _WIN32
2 #define WINVER 0x501
3 #include <winsock2.h>
4 #include <ws2tcpip.h>
5 #else
6 #include <sys/types.h>
7 #include <sys/socket.h>
8 #include <unistd.h>
9 #include <fcntl.h>
10 #include <netdb.h>
11 #include <netinet/tcp.h>
12 #endif
13
14 #include <errno.h>
15 #include "event_log.h"
16 #include "util.h"
17 #include "blastem.h"
18 #include "saves.h"
19
20 enum {
21 CMD_GAMEPAD_DOWN,
22 CMD_GAMEPAD_UP,
23 };
24
25 static uint8_t active, fully_active;
26 static FILE *event_file;
27 static serialize_buffer buffer;
28
29 static const char el_ident[] = "BLSTEL\x02\x00";
30 static uint32_t last;
31 void event_log_file(char *fname)
32 {
33 event_file = fopen(fname, "wb");
34 if (!event_file) {
35 warning("Failed to open event file %s for writing\n", fname);
36 return;
37 }
38 fwrite(el_ident, 1, sizeof(el_ident) - 1, event_file);
39 init_serialize(&buffer);
40 active = fully_active = 1;
41 last = 0;
42 }
43
44 static int listen_sock, remotes[7];
45 static int num_remotes;
46 void event_log_tcp(char *address, char *port)
47 {
48 struct addrinfo request, *result;
49 memset(&request, 0, sizeof(request));
50 request.ai_family = AF_INET;
51 request.ai_socktype = SOCK_STREAM;
52 request.ai_flags = AI_PASSIVE;
53 getaddrinfo(address, port, &request, &result);
54
55 listen_sock = socket(result->ai_family, result->ai_socktype, result->ai_protocol);
56 if (listen_sock < 0) {
57 warning("Failed to open event log listen socket on %s:%s\n", address, port);
58 goto cleanup_address;
59 }
60 int non_block = 1;
61 setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, &non_block, sizeof(non_block));
62 if (bind(listen_sock, result->ai_addr, result->ai_addrlen) < 0) {
63 warning("Failed to bind event log listen socket on %s:%s\n", address, port);
64 close(listen_sock);
65 goto cleanup_address;
66 }
67 if (listen(listen_sock, 3) < 0) {
68 warning("Failed to listen for event log remotes on %s:%s\n", address, port);
69 close(listen_sock);
70 goto cleanup_address;
71 }
72 fcntl(listen_sock, F_SETFL, O_NONBLOCK);
73 active = 1;
74 cleanup_address:
75 freeaddrinfo(result);
76 }
77
78 static uint8_t *system_start;
79 static size_t system_start_size;
80 void event_system_start(system_type stype, vid_std video_std, char *name)
81 {
82 if (!active) {
83 return;
84 }
85 save_int8(&buffer, stype);
86 save_int8(&buffer, video_std);
87 size_t name_len = strlen(name);
88 if (name_len > 255) {
89 name_len = 255;
90 }
91 save_int8(&buffer, name_len);
92 save_buffer8(&buffer, name, strlen(name));
93 if (!fully_active) {
94 system_start = malloc(buffer.size);
95 system_start_size = buffer.size;
96 memcpy(system_start, buffer.data, buffer.size);
97 buffer.size = 0;
98 }
99 }
100
101 //header formats
102 //Single byte: 4 bit type, 4 bit delta (16-31)
103 //Three Byte: 8 bit type, 16-bit delta
104 //Four byte: 8-bit type, 24-bit signed delta
105 #define FORMAT_3BYTE 0xE0
106 #define FORMAT_4BYTE 0xF0
107 static void event_header(uint8_t type, uint32_t cycle)
108 {
109 uint32_t delta = cycle - last;
110 if (delta > 65535) {
111 save_int8(&buffer, FORMAT_4BYTE | type);
112 save_int8(&buffer, delta >> 16);
113 save_int16(&buffer, delta);
114 } else if (delta >= 16 && delta < 32) {
115 save_int8(&buffer, type << 4 | (delta - 16));
116 } else {
117 save_int8(&buffer, FORMAT_3BYTE | type);
118 save_int16(&buffer, delta);
119 }
120 }
121
122 void event_cycle_adjust(uint32_t cycle, uint32_t deduction)
123 {
124 if (!fully_active) {
125 return;
126 }
127 event_header(EVENT_ADJUST, cycle);
128 last = cycle - deduction;
129 save_int32(&buffer, deduction);
130 }
131
132 static size_t remote_send_progress[7];
133 static uint8_t remote_needs_state[7];
134 static void flush_socket(void)
135 {
136 int remote = accept(listen_sock, NULL, NULL);
137 if (remote != -1) {
138 if (num_remotes == 7) {
139 close(remote);
140 } else {
141 printf("remote %d connected\n", num_remotes);
142 remotes[num_remotes] = remote;
143 remote_needs_state[num_remotes++] = 1;
144 current_system->save_state = EVENTLOG_SLOT + 1;
145 }
146 }
147 size_t min_progress = 0;
148 for (int i = 0; i < num_remotes; i++) {
149 errno = 0;
150 int sent = 1;
151 if (remote_needs_state[i]) {
152 remote_send_progress[i] = buffer.size;
153 } else {
154 uint8_t buffer[1500];
155 int bytes = recv(remotes[i], buffer, sizeof(buffer), 0);
156 for (int j = 0; j < bytes; j++)
157 {
158 uint8_t cmd = buffer[j];
159 switch(cmd)
160 {
161 case CMD_GAMEPAD_DOWN:
162 case CMD_GAMEPAD_UP: {
163 ++j;
164 if (j < bytes) {
165 uint8_t button = buffer[j];
166 uint8_t pad = (button >> 5) + i + 1;
167 button &= 0x1F;
168 if (cmd == CMD_GAMEPAD_DOWN) {
169 current_system->gamepad_down(current_system, pad, button);
170 } else {
171 current_system->gamepad_up(current_system, pad, button);
172 }
173 } else {
174 warning("Received incomplete command %X\n", cmd);
175 }
176 break;
177 }
178 default:
179 warning("Unrecognized remote command %X\n", cmd);
180 j = bytes;
181 }
182 }
183 }
184 while (sent && buffer.size - remote_send_progress[i])
185 {
186 sent = send(remotes[i], buffer.data + remote_send_progress[i], buffer.size - remote_send_progress[i], 0);
187 if (sent >= 0) {
188 remote_send_progress[i] += sent;
189 } else if (errno != EAGAIN && errno != EWOULDBLOCK) {
190 close(remotes[i]);
191 remotes[i] = remotes[num_remotes-1];
192 remote_send_progress[i] = remote_send_progress[num_remotes-1];
193 remote_needs_state[i] = remote_needs_state[num_remotes-1];
194 num_remotes--;
195 i--;
196 break;
197 }
198 if (remote_send_progress[i] > min_progress) {
199 min_progress = remote_send_progress[i];
200 }
201 }
202 }
203 if (min_progress == buffer.size) {
204 buffer.size = 0;
205 memset(remote_send_progress, 0, sizeof(remote_send_progress));
206 }
207 }
208
209 void event_log(uint8_t type, uint32_t cycle, uint8_t size, uint8_t *payload)
210 {
211 if (!fully_active) {
212 return;
213 }
214 event_header(type, cycle);
215 last = cycle;
216 save_buffer8(&buffer, payload, size);
217 if (listen_sock && buffer.size > 1280) {
218 flush_socket();
219 }
220 }
221
222 static uint32_t last_word_address;
223 void event_vram_word(uint32_t cycle, uint32_t address, uint16_t value)
224 {
225 uint32_t delta = address - last_word_address;
226 if (delta < 256) {
227 uint8_t buffer[3] = {delta, value >> 8, value};
228 event_log(EVENT_VRAM_WORD_DELTA, cycle, sizeof(buffer), buffer);
229 } else {
230 uint8_t buffer[5] = {address >> 16, address >> 8, address, value >> 8, value};
231 event_log(EVENT_VRAM_WORD, cycle, sizeof(buffer), buffer);
232 }
233 last_word_address = address;
234 }
235
236 static uint32_t last_byte_address;
237 void event_vram_byte(uint32_t cycle, uint16_t address, uint8_t byte, uint8_t auto_inc)
238 {
239 uint32_t delta = address - last_byte_address;
240 if (delta == 1) {
241 event_log(EVENT_VRAM_BYTE_ONE, cycle, sizeof(byte), &byte);
242 } else if (delta == auto_inc) {
243 event_log(EVENT_VRAM_BYTE_AUTO, cycle, sizeof(byte), &byte);
244 } else if (delta < 256) {
245 uint8_t buffer[2] = {delta, byte};
246 event_log(EVENT_VRAM_BYTE_DELTA, cycle, sizeof(buffer), buffer);
247 } else {
248 uint8_t buffer[3] = {address >> 8, address, byte};
249 event_log(EVENT_VRAM_BYTE, cycle, sizeof(buffer), buffer);
250 }
251 last_byte_address = address;
252 }
253
254 static size_t send_all(int sock, uint8_t *data, size_t size, int flags)
255 {
256 size_t total = 0, sent = 1;
257 while(sent > 0 && total < size)
258 {
259 sent = send(sock, data + total, size - total, flags);
260 if (sent > 0) {
261 total += sent;
262 }
263 }
264 return total;
265 }
266
267 void event_state(uint32_t cycle, serialize_buffer *state)
268 {
269 if (!fully_active) {
270 last = cycle;
271 }
272 uint8_t header[] = {
273 EVENT_STATE << 4, last >> 24, last >> 16, last >> 8, last,
274 last_word_address >> 16, last_word_address >> 8, last_word_address,
275 last_byte_address >> 8, last_byte_address,
276 state->size >> 16, state->size >> 8, state->size
277 };
278 for (int i = 0; i < num_remotes; i++)
279 {
280 if (remote_needs_state[i]) {
281 if(
282 send_all(remotes[i], system_start, system_start_size, 0) == system_start_size
283 && send_all(remotes[i], header, sizeof(header), 0) == sizeof(header)
284 && send_all(remotes[i], state->data, state->size, 0) == state->size
285 ) {
286 remote_send_progress[i] = buffer.size;
287 remote_needs_state[i] = 0;
288 fcntl(remotes[i], F_SETFL, O_NONBLOCK);
289 int flag = 1;
290 setsockopt(remotes[i], IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));
291 fully_active = 1;
292 } else {
293 close(remotes[i]);
294 remotes[i] = remotes[num_remotes-1];
295 remote_send_progress[i] = remote_send_progress[num_remotes-1];
296 remote_needs_state[i] = remote_needs_state[num_remotes-1];
297 num_remotes--;
298 i--;
299 }
300 }
301 }
302 }
303
304 void event_flush(uint32_t cycle)
305 {
306 if (!active) {
307 return;
308 }
309 if (fully_active) {
310 event_log(EVENT_FLUSH, cycle, 0, NULL);
311 }
312 if (event_file) {
313 fwrite(buffer.data, 1, buffer.size, event_file);
314 fflush(event_file);
315 buffer.size = 0;
316 } else if (listen_sock) {
317 flush_socket();
318 }
319 }
320
321 void init_event_reader(event_reader *reader, uint8_t *data, size_t size)
322 {
323 reader->socket = 0;
324 reader->last_cycle = 0;
325 init_deserialize(&reader->buffer, data, size);
326 }
327
328 void init_event_reader_tcp(event_reader *reader, char *address, char *port)
329 {
330 struct addrinfo request, *result;
331 memset(&request, 0, sizeof(request));
332 request.ai_family = AF_INET;
333 request.ai_socktype = SOCK_STREAM;
334 request.ai_flags = AI_PASSIVE;
335 getaddrinfo(address, port, &request, &result);
336
337 reader->socket = socket(result->ai_family, result->ai_socktype, result->ai_protocol);
338 if (reader->socket < 0) {
339 fatal_error("Failed to create socket for event log connection to %s:%s\n", address, port);
340 }
341 if (connect(reader->socket, result->ai_addr, result->ai_addrlen) < 0) {
342 fatal_error("Failed to connect to %s:%s for event log stream\n", address, port);
343 }
344
345 reader->storage = 512 * 1024;
346 reader->last_cycle = 0;
347 init_deserialize(&reader->buffer, malloc(reader->storage), reader->storage);
348 reader->buffer.size = 0;
349 while(reader->buffer.size < 3 || reader->buffer.size < 3 + reader->buffer.data[2])
350 {
351 int bytes = recv(reader->socket, reader->buffer.data + reader->buffer.size, reader->storage - reader->buffer.size, 0);
352 if (bytes < 0) {
353 fatal_error("Failed to receive system init from %s:%s\n", address, port);
354 }
355 reader->buffer.size += bytes;
356 }
357 fcntl(reader->socket, F_SETFL, O_NONBLOCK);
358 int flag = 1;
359 setsockopt(reader->socket, IPPROTO_TCP, TCP_NODELAY, &flag, sizeof(flag));
360 }
361
362 uint8_t reader_next_event(event_reader *reader, uint32_t *cycle_out)
363 {
364 if (reader->socket) {
365 uint8_t blocking = 0;
366 if (reader->buffer.size - reader->buffer.cur_pos < 9) {
367 //set back to block mode
368 fcntl(reader->socket, F_SETFL, 0);
369 blocking = 1;
370 }
371 if (reader->storage - (reader->buffer.size - reader->buffer.cur_pos) < 128 * 1024) {
372 reader->storage *= 2;
373 uint8_t *new_buf = malloc(reader->storage);
374 memcpy(new_buf, reader->buffer.data + reader->buffer.cur_pos, reader->buffer.size - reader->buffer.cur_pos);
375 free(reader->buffer.data);
376 reader->buffer.data = new_buf;
377 reader->buffer.size -= reader->buffer.cur_pos;
378 reader->buffer.cur_pos = 0;
379 } else if (reader->buffer.cur_pos >= reader->buffer.size/2 && reader->buffer.size >= reader->storage/2) {
380 memmove(reader->buffer.data, reader->buffer.data + reader->buffer.cur_pos, reader->buffer.size - reader->buffer.cur_pos);
381 reader->buffer.size -= reader->buffer.cur_pos;
382 reader->buffer.cur_pos = 0;
383 }
384 int bytes = 128;
385 while (bytes > 127 && reader->buffer.size < reader->storage)
386 {
387 errno = 0;
388 bytes = recv(reader->socket, reader->buffer.data + reader->buffer.size, reader->storage - reader->buffer.size, 0);
389 if (bytes >= 0) {
390 reader->buffer.size += bytes;
391 if (blocking && reader->buffer.size - reader->buffer.cur_pos >= 9) {
392 fcntl(reader->socket, F_SETFL, O_NONBLOCK);
393 }
394 } else if (errno != EAGAIN && errno != EWOULDBLOCK) {
395 puts("Connection closed");
396 exit(0);
397 }
398 }
399 }
400 uint8_t header = load_int8(&reader->buffer);
401 uint8_t ret;
402 uint32_t delta;
403 if ((header & 0xF0) < FORMAT_3BYTE) {
404 delta = header & 0xF + 16;
405 ret = header >> 4;
406 } else if ((header & 0xF0) == FORMAT_3BYTE) {
407 delta = load_int16(&reader->buffer);
408 ret = header & 0xF;
409 } else {
410 delta = load_int8(&reader->buffer) << 16;
411 //sign extend 24-bit delta to 32-bit
412 if (delta & 0x800000) {
413 delta |= 0xFF000000;
414 }
415 delta |= load_int16(&reader->buffer);
416 ret = header & 0xF;
417 }
418 *cycle_out = reader->last_cycle + delta;
419 reader->last_cycle = *cycle_out;
420 if (ret == EVENT_ADJUST) {
421 size_t old_pos = reader->buffer.cur_pos;
422 uint32_t adjust = load_int32(&reader->buffer);
423 reader->buffer.cur_pos = old_pos;
424 reader->last_cycle -= adjust;
425 } else if (ret == EVENT_STATE) {
426 reader->last_cycle = load_int32(&reader->buffer);
427 reader->last_word_address = load_int8(&reader->buffer) << 16;
428 reader->last_word_address |= load_int16(&reader->buffer);
429 reader->last_byte_address = load_int16(&reader->buffer);
430 }
431 return ret;
432 }
433
434 uint8_t reader_system_type(event_reader *reader)
435 {
436 return load_int8(&reader->buffer);
437 }
438
439 void reader_send_gamepad_event(event_reader *reader, uint8_t pad, uint8_t button, uint8_t down)
440 {
441 uint8_t buffer[] = {down ? CMD_GAMEPAD_DOWN : CMD_GAMEPAD_UP, pad << 5 | button};
442 //TODO: Deal with the fact that we're not in blocking mode so this may not actually send all
443 //if the buffer is full
444 send_all(reader->socket, buffer, sizeof(buffer), 0);
445 }