comparison event_log.c @ 2053:3414a4423de1 segacd

Merge from default
author Michael Pavone <pavone@retrodev.com>
date Sat, 15 Jan 2022 13:15:21 -0800
parents a042e046f7f2
children c4256ce2c45a
comparison
equal deleted inserted replaced
1692:5dacaef602a7 2053:3414a4423de1
1 #ifdef _WIN32
2 #define WINVER 0x501
3 #include <winsock2.h>
4 #include <ws2tcpip.h>
5 #else
6 #include <sys/types.h>
7 #include <sys/socket.h>
8 #include <unistd.h>
9 #include <netdb.h>
10 #include <netinet/tcp.h>
11 #endif
12
13 #include <stdlib.h>
14 #include <string.h>
15 #include <errno.h>
16 #include "event_log.h"
17 #include "util.h"
18 #include "blastem.h"
19 #include "saves.h"
20 #include "zlib/zlib.h"
21
22 enum {
23 CMD_GAMEPAD_DOWN,
24 CMD_GAMEPAD_UP,
25 };
26
27 static uint8_t active, fully_active;
28 static FILE *event_file;
29 static serialize_buffer buffer;
30 static uint8_t *compressed;
31 static size_t compressed_storage;
32 static z_stream output_stream;
33 static uint32_t last;
34
35 static void event_log_common_init(void)
36 {
37 init_serialize(&buffer);
38 compressed_storage = 128*1024;
39 compressed = malloc(compressed_storage);
40 deflateInit(&output_stream, 9);
41 output_stream.avail_out = compressed_storage;
42 output_stream.next_out = compressed;
43 output_stream.avail_in = 0;
44 output_stream.next_in = buffer.data;
45 last = 0;
46 active = 1;
47 }
48
49 static uint8_t multi_count;
50 static size_t multi_start;
51 static void finish_multi(void)
52 {
53 buffer.data[multi_start] |= multi_count - 2;
54 multi_count = 0;
55 }
56
57 static void file_finish(void)
58 {
59 fwrite(compressed, 1, output_stream.next_out - compressed, event_file);
60 output_stream.next_out = compressed;
61 output_stream.avail_out = compressed_storage;
62 int result = deflate(&output_stream, Z_FINISH);
63 if (Z_STREAM_END != result) {
64 fatal_error("Final deflate call returned %d\n", result);
65 }
66 fwrite(compressed, 1, output_stream.next_out - compressed, event_file);
67 fclose(event_file);
68 }
69
70 static const char el_ident[] = "BLSTEL\x02\x00";
71 void event_log_file(char *fname)
72 {
73 event_file = fopen(fname, "wb");
74 if (!event_file) {
75 warning("Failed to open event file %s for writing\n", fname);
76 return;
77 }
78 fwrite(el_ident, 1, sizeof(el_ident) - 1, event_file);
79 event_log_common_init();
80 fully_active = 1;
81 atexit(file_finish);
82 }
83
84 typedef struct {
85 uint8_t *send_progress;
86 int sock;
87 uint8_t players[1]; //TODO: Expand when support for multiple players per remote is added
88 uint8_t num_players;
89 } remote;
90
91 static int listen_sock;
92 static remote remotes[7];
93 static int num_remotes;
94 static uint8_t available_players[7] = {2,3,4,5,6,7,8};
95 static int num_available_players = 7;
96 void event_log_tcp(char *address, char *port)
97 {
98 struct addrinfo request, *result;
99 socket_init();
100 memset(&request, 0, sizeof(request));
101 request.ai_family = AF_INET;
102 request.ai_socktype = SOCK_STREAM;
103 request.ai_flags = AI_PASSIVE;
104 getaddrinfo(address, port, &request, &result);
105
106 listen_sock = socket(result->ai_family, result->ai_socktype, result->ai_protocol);
107 if (listen_sock < 0) {
108 warning("Failed to open event log listen socket on %s:%s\n", address, port);
109 goto cleanup_address;
110 }
111 int param = 1;
112 setsockopt(listen_sock, SOL_SOCKET, SO_REUSEADDR, (const char *)&param, sizeof(param));
113 if (bind(listen_sock, result->ai_addr, result->ai_addrlen) < 0) {
114 warning("Failed to bind event log listen socket on %s:%s\n", address, port);
115 socket_close(listen_sock);
116 goto cleanup_address;
117 }
118 if (listen(listen_sock, 3) < 0) {
119 warning("Failed to listen for event log remotes on %s:%s\n", address, port);
120 socket_close(listen_sock);
121 goto cleanup_address;
122 }
123 socket_blocking(listen_sock, 0);
124 event_log_common_init();
125 cleanup_address:
126 freeaddrinfo(result);
127 }
128
129 static uint8_t *system_start;
130 static size_t system_start_size;
131 void event_system_start(system_type stype, vid_std video_std, char *name)
132 {
133 if (!active) {
134 return;
135 }
136 save_int8(&buffer, stype);
137 save_int8(&buffer, video_std);
138 size_t name_len = strlen(name);
139 if (name_len > 255) {
140 name_len = 255;
141 }
142 save_int8(&buffer, name_len);
143 save_buffer8(&buffer, name, strlen(name));
144 if (listen_sock) {
145 system_start = malloc(buffer.size);
146 system_start_size = buffer.size;
147 memcpy(system_start, buffer.data, buffer.size);
148 } else {
149 //system start header is never compressed, so write to file immediately
150 fwrite(buffer.data, 1, buffer.size, event_file);
151 }
152 buffer.size = 0;
153 }
154
155 //header formats
156 //Single byte: 4 bit type, 4 bit delta (16-31)
157 //Three Byte: 8 bit type, 16-bit delta
158 //Four byte: 8-bit type, 24-bit signed delta
159 #define FORMAT_3BYTE 0xE0
160 #define FORMAT_4BYTE 0xF0
161 static uint8_t last_event_type = 0xFF;
162 static uint32_t last_delta;
163 static void event_header(uint8_t type, uint32_t cycle)
164 {
165 uint32_t delta = cycle - last;
166 if (multi_count) {
167 if (type != last_event_type || delta != last_delta) {
168 finish_multi();
169 } else {
170 ++multi_count;
171 if (multi_count == 17) {
172 finish_multi();
173 last_event_type = 0xFF;
174 }
175 return;
176 }
177 } else if (type == last_event_type && delta == last_delta && type != EVENT_FLUSH) {
178 //make some room
179 save_int8(&buffer, 0);
180 //shift existing command
181 memmove(buffer.data + multi_start + 1, buffer.data + multi_start, buffer.size - multi_start - 1);
182 buffer.data[multi_start] = EVENT_MULTI << 4;
183 multi_count = 2;
184 return;
185 }
186 multi_start = buffer.size;
187 last_event_type = type;
188 last_delta = delta;
189
190 if (delta > 65535) {
191 save_int8(&buffer, FORMAT_4BYTE | type);
192 save_int8(&buffer, delta >> 16);
193 save_int16(&buffer, delta);
194 } else if (delta >= 16 && delta < 32) {
195 save_int8(&buffer, type << 4 | (delta - 16));
196 } else {
197 save_int8(&buffer, FORMAT_3BYTE | type);
198 save_int16(&buffer, delta);
199 }
200 }
201
202 void event_cycle_adjust(uint32_t cycle, uint32_t deduction)
203 {
204 if (!fully_active) {
205 return;
206 }
207 event_header(EVENT_ADJUST, cycle);
208 last = cycle - deduction;
209 save_int32(&buffer, deduction);
210 }
211
212 static uint8_t next_available_player(void)
213 {
214 uint8_t lowest = 0xFF;
215 int lowest_index = -1;
216 for (int i = 0; i < num_available_players; i++)
217 {
218 if (available_players[i] < lowest) {
219 lowest = available_players[i];
220 lowest_index = i;
221 }
222 }
223 if (lowest_index >= 0) {
224 available_players[lowest_index] = available_players[num_available_players - 1];
225 --num_available_players;
226 }
227 return lowest;
228 }
229
230 static void flush_socket(void)
231 {
232 int remote_sock = accept(listen_sock, NULL, NULL);
233 if (remote_sock != -1) {
234 if (num_remotes == 7) {
235 socket_close(remote_sock);
236 } else {
237 printf("remote %d connected\n", num_remotes);
238 uint8_t player = next_available_player();
239 remotes[num_remotes++] = (remote){
240 .sock = remote_sock,
241 .send_progress = NULL,
242 .players = {player},
243 .num_players = player == 0xFF ? 0 : 1
244 };
245 current_system->save_state = EVENTLOG_SLOT + 1;
246 }
247 }
248 uint8_t *min_progress = compressed;
249 for (int i = 0; i < num_remotes; i++) {
250 if (remotes[i].send_progress) {
251 uint8_t recv_buffer[1500];
252 int bytes = recv(remotes[i].sock, recv_buffer, sizeof(recv_buffer), 0);
253 for (int j = 0; j < bytes; j++)
254 {
255 uint8_t cmd = recv_buffer[j];
256 switch(cmd)
257 {
258 case CMD_GAMEPAD_DOWN:
259 case CMD_GAMEPAD_UP: {
260 ++j;
261 if (j < bytes) {
262 uint8_t button = recv_buffer[j];
263 uint8_t pad = (button >> 5) - 1;
264 button &= 0x1F;
265 if (pad < remotes[i].num_players) {
266 pad = remotes[i].players[pad];
267 if (cmd == CMD_GAMEPAD_DOWN) {
268 current_system->gamepad_down(current_system, pad, button);
269 } else {
270 current_system->gamepad_up(current_system, pad, button);
271 }
272 }
273 } else {
274 warning("Received incomplete command %X\n", cmd);
275 }
276 break;
277 }
278 default:
279 warning("Unrecognized remote command %X\n", cmd);
280 j = bytes;
281 }
282 }
283 int sent = 1;
284 while (sent && output_stream.next_out > remotes[i].send_progress)
285 {
286 sent = send(remotes[i].sock, remotes[i].send_progress, output_stream.next_out - remotes[i].send_progress, 0);
287 if (sent >= 0) {
288 remotes[i].send_progress += sent;
289 } else if (!socket_error_is_wouldblock()) {
290 socket_close(remotes[i].sock);
291 for (int j = 0; j < remotes[i].num_players; j++) {
292 available_players[num_available_players++] = remotes[i].players[j];
293 }
294 remotes[i] = remotes[num_remotes-1];
295 num_remotes--;
296 if (!num_remotes) {
297 //last remote disconnected, reset buffers/deflate
298 fully_active = 0;
299 deflateReset(&output_stream);
300 output_stream.next_out = compressed;
301 output_stream.avail_out = compressed_storage;
302 buffer.size = 0;
303 }
304 i--;
305 break;
306 }
307 if (remotes[i].send_progress > min_progress) {
308 min_progress = remotes[i].send_progress;
309 }
310 }
311 }
312 }
313 if (min_progress == output_stream.next_out) {
314 output_stream.next_out = compressed;
315 output_stream.avail_out = compressed_storage;
316 for (int i = 0; i < num_remotes; i++) {
317 if (remotes[i].send_progress) {
318 remotes[i].send_progress = compressed;
319 }
320 }
321 }
322 }
323
324 uint8_t wrote_since_last_flush;
325 void event_log(uint8_t type, uint32_t cycle, uint8_t size, uint8_t *payload)
326 {
327 if (!fully_active) {
328 return;
329 }
330 event_header(type, cycle);
331 last = cycle;
332 save_buffer8(&buffer, payload, size);
333 if (!multi_count) {
334 last_event_type = 0xFF;
335 output_stream.avail_in = buffer.size - (output_stream.next_in - buffer.data);
336 int result = deflate(&output_stream, Z_NO_FLUSH);
337 if (result != Z_OK) {
338 fatal_error("deflate returned %d\n", result);
339 }
340 if (listen_sock) {
341 if ((output_stream.next_out - compressed) > 1280 || !output_stream.avail_out) {
342 flush_socket();
343 wrote_since_last_flush = 1;
344 }
345 } else if (!output_stream.avail_out) {
346 fwrite(compressed, 1, compressed_storage, event_file);
347 output_stream.next_out = compressed;
348 output_stream.avail_out = compressed_storage;
349 }
350 if (!output_stream.avail_in) {
351 buffer.size = 0;
352 output_stream.next_in = buffer.data;
353 }
354 }
355 }
356
357 static uint32_t last_word_address;
358 void event_vram_word(uint32_t cycle, uint32_t address, uint16_t value)
359 {
360 uint32_t delta = address - last_word_address;
361 if (delta < 256) {
362 uint8_t buffer[3] = {delta, value >> 8, value};
363 event_log(EVENT_VRAM_WORD_DELTA, cycle, sizeof(buffer), buffer);
364 } else {
365 uint8_t buffer[5] = {address >> 16, address >> 8, address, value >> 8, value};
366 event_log(EVENT_VRAM_WORD, cycle, sizeof(buffer), buffer);
367 }
368 last_word_address = address;
369 }
370
371 static uint32_t last_byte_address;
372 void event_vram_byte(uint32_t cycle, uint16_t address, uint8_t byte, uint8_t auto_inc)
373 {
374 uint32_t delta = address - last_byte_address;
375 if (delta == 1) {
376 event_log(EVENT_VRAM_BYTE_ONE, cycle, sizeof(byte), &byte);
377 } else if (delta == auto_inc) {
378 event_log(EVENT_VRAM_BYTE_AUTO, cycle, sizeof(byte), &byte);
379 } else if (delta < 256) {
380 uint8_t buffer[2] = {delta, byte};
381 event_log(EVENT_VRAM_BYTE_DELTA, cycle, sizeof(buffer), buffer);
382 } else {
383 uint8_t buffer[3] = {address >> 8, address, byte};
384 event_log(EVENT_VRAM_BYTE, cycle, sizeof(buffer), buffer);
385 }
386 last_byte_address = address;
387 }
388
389 static size_t send_all(int sock, uint8_t *data, size_t size, int flags)
390 {
391 size_t total = 0, sent = 1;
392 while(sent > 0 && total < size)
393 {
394 sent = send(sock, data + total, size - total, flags);
395 if (sent > 0) {
396 total += sent;
397 }
398 }
399 return total;
400 }
401
402 void deflate_flush(uint8_t full)
403 {
404 output_stream.avail_in = buffer.size - (output_stream.next_in - buffer.data);
405 uint8_t force = full;
406 while (output_stream.avail_in || force)
407 {
408 if (!output_stream.avail_out) {
409 size_t old_storage = compressed_storage;
410 uint8_t *old_compressed = compressed;
411 compressed_storage *= 2;
412 compressed = realloc(compressed, compressed_storage);
413 output_stream.next_out = compressed + old_storage;
414 output_stream.avail_out = old_storage;
415 for (int i = 0; i < num_remotes; i++) {
416 if (remotes[i].send_progress) {
417 remotes[i].send_progress = compressed + (remotes[i].send_progress - old_compressed);
418 }
419 }
420 }
421 int result = deflate(&output_stream, full ? Z_FINISH : Z_SYNC_FLUSH);
422 if (result != (full ? Z_STREAM_END : Z_OK)) {
423 fatal_error("deflate returned %d\n", result);
424 }
425 if (full && result == Z_STREAM_END) {
426 result = deflateReset(&output_stream);
427 if (result != Z_OK) {
428 fatal_error("deflateReset returned %d\n", result);
429 }
430 }
431 force = 0;
432 }
433 output_stream.next_in = buffer.data;
434 buffer.size = 0;
435 }
436
437 void event_state(uint32_t cycle, serialize_buffer *state)
438 {
439 if (!fully_active) {
440 last = cycle;
441 }
442 uint8_t header[] = {
443 EVENT_STATE << 4, last >> 24, last >> 16, last >> 8, last,
444 last_word_address >> 16, last_word_address >> 8, last_word_address,
445 last_byte_address >> 8, last_byte_address,
446 state->size >> 16, state->size >> 8, state->size
447 };
448 uint8_t sent_system_start = 0;
449 for (int i = 0; i < num_remotes; i++)
450 {
451 if (!remotes[i].send_progress) {
452 if (send_all(remotes[i].sock, system_start, system_start_size, 0) == system_start_size) {
453 sent_system_start = 1;
454 } else {
455 socket_close(remotes[i].sock);
456 remotes[i] = remotes[num_remotes-1];
457 num_remotes--;
458 i--;
459 }
460 }
461 }
462 if (sent_system_start) {
463 if (fully_active) {
464 if (multi_count) {
465 finish_multi();
466 }
467 //full flush is needed so new and old clients can share a stream
468 deflate_flush(1);
469 }
470 save_buffer8(&buffer, header, sizeof(header));
471 save_buffer8(&buffer, state->data, state->size);
472 size_t old_compressed_size = output_stream.next_out - compressed;
473 deflate_flush(1);
474 size_t state_size = output_stream.next_out - compressed - old_compressed_size;
475 for (int i = 0; i < num_remotes; i++) {
476 if (!remotes[i].send_progress) {
477 if (send_all(remotes[i].sock, compressed + old_compressed_size, state_size, 0) == state_size) {
478 remotes[i].send_progress = compressed + old_compressed_size;
479 socket_blocking(remotes[i].sock, 0);
480 int flag = 1;
481 setsockopt(remotes[i].sock, IPPROTO_TCP, TCP_NODELAY, (const char *)&flag, sizeof(flag));
482 fully_active = 1;
483 } else {
484 socket_close(remotes[i].sock);
485 remotes[i] = remotes[num_remotes-1];
486 num_remotes--;
487 i--;
488 }
489 }
490 }
491 output_stream.next_out = compressed + old_compressed_size;
492 output_stream.avail_out = compressed_storage - old_compressed_size;
493 }
494 }
495
496 void event_flush(uint32_t cycle)
497 {
498 if (!active) {
499 return;
500 }
501 if (fully_active) {
502 event_header(EVENT_FLUSH, cycle);
503 last = cycle;
504
505 deflate_flush(0);
506 }
507 if (event_file) {
508 fwrite(compressed, 1, output_stream.next_out - compressed, event_file);
509 fflush(event_file);
510 output_stream.next_out = compressed;
511 output_stream.avail_out = compressed_storage;
512 } else if (listen_sock) {
513 flush_socket();
514 wrote_since_last_flush = 0;
515 }
516 }
517
518 void event_soft_flush(uint32_t cycle)
519 {
520 if (!fully_active || wrote_since_last_flush || event_file) {
521 return;
522 }
523 event_header(EVENT_FLUSH, cycle);
524 last = cycle;
525
526 deflate_flush(0);
527 flush_socket();
528 }
529
530 static void init_event_reader_common(event_reader *reader)
531 {
532 reader->last_cycle = 0;
533 reader->repeat_event = 0xFF;
534 reader->storage = 512 * 1024;
535 init_deserialize(&reader->buffer, malloc(reader->storage), reader->storage);
536 reader->buffer.size = 0;
537 memset(&reader->input_stream, 0, sizeof(reader->input_stream));
538
539 }
540
541 void init_event_reader(event_reader *reader, uint8_t *data, size_t size)
542 {
543 reader->socket = 0;
544 reader->last_cycle = 0;
545 reader->repeat_event = 0xFF;
546 init_event_reader_common(reader);
547 uint8_t name_len = data[1];
548 reader->buffer.size = name_len + 2;
549 memcpy(reader->buffer.data, data, reader->buffer.size);
550 reader->input_stream.next_in = data + reader->buffer.size;
551 reader->input_stream.avail_in = size - reader->buffer.size;
552
553 int result = inflateInit(&reader->input_stream);
554 if (Z_OK != result) {
555 fatal_error("inflateInit returned %d\n", result);
556 }
557 reader->input_stream.next_out = reader->buffer.data + reader->buffer.size;
558 reader->input_stream.avail_out = reader->storage - reader->buffer.size;
559 result = inflate(&reader->input_stream, Z_NO_FLUSH);
560 if (Z_OK != result && Z_STREAM_END != result) {
561 fatal_error("inflate returned %d\n", result);
562 }
563 reader->buffer.size = reader->input_stream.next_out - reader->buffer.data;
564 }
565
566 void init_event_reader_tcp(event_reader *reader, char *address, char *port)
567 {
568 struct addrinfo request, *result;
569 socket_init();
570 memset(&request, 0, sizeof(request));
571 request.ai_family = AF_INET;
572 request.ai_socktype = SOCK_STREAM;
573 request.ai_flags = AI_PASSIVE;
574 getaddrinfo(address, port, &request, &result);
575
576 reader->socket = socket(result->ai_family, result->ai_socktype, result->ai_protocol);
577 if (reader->socket < 0) {
578 fatal_error("Failed to create socket for event log connection to %s:%s\n", address, port);
579 }
580 if (connect(reader->socket, result->ai_addr, result->ai_addrlen) < 0) {
581 fatal_error("Failed to connect to %s:%s for event log stream\n", address, port);
582 }
583
584 init_event_reader_common(reader);
585 reader->socket_buffer_size = 256 * 1024;
586 reader->socket_buffer = malloc(reader->socket_buffer_size);
587
588 while(reader->buffer.size < 3 || reader->buffer.size < 3 + reader->buffer.data[2])
589 {
590 int bytes = recv(reader->socket, reader->buffer.data + reader->buffer.size, reader->storage - reader->buffer.size, 0);
591 if (bytes < 0) {
592 fatal_error("Failed to receive system init from %s:%s\n", address, port);
593 }
594 reader->buffer.size += bytes;
595 }
596 size_t init_msg_len = 3 + reader->buffer.data[2];
597 memcpy(reader->socket_buffer, reader->buffer.data + init_msg_len, reader->buffer.size - init_msg_len);
598 reader->input_stream.next_in = reader->socket_buffer;
599 reader->input_stream.avail_in = reader->buffer.size - init_msg_len;
600 reader->buffer.size = init_msg_len;
601 int res = inflateInit(&reader->input_stream);
602 if (Z_OK != res) {
603 fatal_error("inflateInit returned %d\n", res);
604 }
605 reader->input_stream.next_out = reader->buffer.data + init_msg_len;
606 reader->input_stream.avail_out = reader->storage - init_msg_len;
607 res = inflate(&reader->input_stream, Z_NO_FLUSH);
608 if (Z_OK != res && Z_BUF_ERROR != res) {
609 fatal_error("inflate returned %d in init_event_reader_tcp\n", res);
610 }
611 int flag = 1;
612 setsockopt(reader->socket, IPPROTO_TCP, TCP_NODELAY, (const char *)&flag, sizeof(flag));
613 }
614
615 static void read_from_socket(event_reader *reader)
616 {
617 if (reader->socket_buffer_size - reader->input_stream.avail_in < 128 * 1024) {
618 reader->socket_buffer_size *= 2;
619 uint8_t *new_buf = malloc(reader->socket_buffer_size);
620 memcpy(new_buf, reader->input_stream.next_in, reader->input_stream.avail_in);
621 free(reader->socket_buffer);
622 reader->socket_buffer = new_buf;
623 reader->input_stream.next_in = new_buf;
624 } else if (
625 reader->input_stream.next_in - reader->socket_buffer >= reader->input_stream.avail_in
626 && reader->input_stream.next_in - reader->socket_buffer + reader->input_stream.avail_in >= reader->socket_buffer_size/2
627 ) {
628 memmove(reader->socket_buffer, reader->input_stream.next_in, reader->input_stream.avail_in);
629 reader->input_stream.next_in = reader->socket_buffer;
630 }
631 uint8_t *space_start = reader->input_stream.next_in + reader->input_stream.avail_in;
632 size_t space = (reader->socket_buffer + reader->socket_buffer_size) - space_start;
633 int bytes = recv(reader->socket, space_start, space, 0);
634 if (bytes >= 0) {
635 reader->input_stream.avail_in += bytes;
636 } else if (!socket_error_is_wouldblock()) {
637 fatal_error("Connection closed, error = %X\n", socket_last_error());
638 }
639 }
640
641 static void inflate_flush(event_reader *reader)
642 {
643 if (reader->buffer.cur_pos > reader->storage / 2) {
644 memmove(reader->buffer.data, reader->buffer.data + reader->buffer.cur_pos, reader->buffer.size - reader->buffer.cur_pos);
645 reader->buffer.size -= reader->buffer.cur_pos;
646 reader->buffer.cur_pos = 0;
647 reader->input_stream.next_out = reader->buffer.data + reader->buffer.size;
648 reader->input_stream.avail_out = reader->storage - reader->buffer.size;
649 }
650 int result = inflate(&reader->input_stream, Z_SYNC_FLUSH);
651 if (Z_OK != result && Z_STREAM_END != result) {
652 fatal_error("inflate returned %d\n", result);
653 }
654 reader->buffer.size = reader->input_stream.next_out - reader->buffer.data;
655 if (result == Z_STREAM_END && (reader->socket || reader->input_stream.avail_in)) {
656 inflateReset(&reader->input_stream);
657 if (reader->input_stream.avail_in) {
658 inflate_flush(reader);
659 }
660 }
661
662 }
663
664 void reader_ensure_data(event_reader *reader, size_t bytes)
665 {
666 if (reader->buffer.size - reader->buffer.cur_pos < bytes) {
667 if (reader->input_stream.avail_in) {
668 inflate_flush(reader);
669 }
670 if (reader->socket) {
671 while (reader->buffer.size - reader->buffer.cur_pos < bytes) {
672 read_from_socket(reader);
673 inflate_flush(reader);
674 }
675 }
676 }
677 }
678
679 uint8_t reader_next_event(event_reader *reader, uint32_t *cycle_out)
680 {
681 if (reader->repeat_remaining) {
682 reader->repeat_remaining--;
683 *cycle_out = reader->last_cycle + reader->repeat_delta;
684 reader->last_cycle = *cycle_out;
685 return reader->repeat_event;
686 }
687 reader_ensure_data(reader, 1);
688 uint8_t header = load_int8(&reader->buffer);
689 uint8_t ret;
690 uint32_t delta;
691 uint8_t multi_start = 0;
692 if ((header & 0xF0) == (EVENT_MULTI << 4)) {
693 reader->repeat_remaining = (header & 0xF) + 1;
694 multi_start = 1;
695 reader_ensure_data(reader, 1);
696 header = load_int8(&reader->buffer);
697 }
698 if ((header & 0xF0) < FORMAT_3BYTE) {
699 delta = (header & 0xF) + 16;
700 ret = header >> 4;
701 } else if ((header & 0xF0) == FORMAT_3BYTE) {
702 reader_ensure_data(reader, 2);
703 delta = load_int16(&reader->buffer);
704 ret = header & 0xF;
705 } else {
706 reader_ensure_data(reader, 3);
707 delta = load_int8(&reader->buffer) << 16;
708 //sign extend 24-bit delta to 32-bit
709 if (delta & 0x800000) {
710 delta |= 0xFF000000;
711 }
712 delta |= load_int16(&reader->buffer);
713 ret = header & 0xF;
714 }
715 if (multi_start) {
716 reader->repeat_event = ret;
717 reader->repeat_delta = delta;
718 }
719 *cycle_out = reader->last_cycle + delta;
720 reader->last_cycle = *cycle_out;
721 if (ret == EVENT_ADJUST) {
722 reader_ensure_data(reader, 4);
723 size_t old_pos = reader->buffer.cur_pos;
724 uint32_t adjust = load_int32(&reader->buffer);
725 reader->buffer.cur_pos = old_pos;
726 reader->last_cycle -= adjust;
727 } else if (ret == EVENT_STATE) {
728 reader_ensure_data(reader, 8);
729 reader->last_cycle = load_int32(&reader->buffer);
730 reader->last_word_address = load_int8(&reader->buffer) << 16;
731 reader->last_word_address |= load_int16(&reader->buffer);
732 reader->last_byte_address = load_int16(&reader->buffer);
733 }
734 return ret;
735 }
736
737 uint8_t reader_system_type(event_reader *reader)
738 {
739 return load_int8(&reader->buffer);
740 }
741
742 void reader_send_gamepad_event(event_reader *reader, uint8_t pad, uint8_t button, uint8_t down)
743 {
744 uint8_t buffer[] = {down ? CMD_GAMEPAD_DOWN : CMD_GAMEPAD_UP, pad << 5 | button};
745 //TODO: Deal with the fact that we're not in blocking mode so this may not actually send all
746 //if the buffer is full
747 send_all(reader->socket, buffer, sizeof(buffer), 0);
748 }