comparison event_log.c @ 1957:ba06346611a1

Fix some netplay issues
author Mike Pavone <pavone@retrodev.com>
date Sat, 02 May 2020 00:52:21 -0700
parents 275f1c4bdb25
children 9c01945b5d20
comparison
equal deleted inserted replaced
1956:275f1c4bdb25 1957:ba06346611a1
122 last_event_type = 0xFF; 122 last_event_type = 0xFF;
123 multi_count = 0; 123 multi_count = 0;
124 } 124 }
125 return; 125 return;
126 } 126 }
127 } else if (type == last_event_type && delta == last_delta) { 127 } else if (type == last_event_type && delta == last_delta && type != EVENT_FLUSH) {
128 //make some room 128 //make some room
129 save_int8(&buffer, 0); 129 save_int8(&buffer, 0);
130 //shift existing command 130 //shift existing command
131 memmove(buffer.data + multi_start + 1, buffer.data + multi_start, buffer.size - multi_start - 1); 131 memmove(buffer.data + multi_start + 1, buffer.data + multi_start, buffer.size - multi_start - 1);
132 buffer.data[multi_start] = EVENT_MULTI << 4; 132 buffer.data[multi_start] = EVENT_MULTI << 4;
244 } 244 }
245 event_header(type, cycle); 245 event_header(type, cycle);
246 last = cycle; 246 last = cycle;
247 save_buffer8(&buffer, payload, size); 247 save_buffer8(&buffer, payload, size);
248 if (listen_sock && buffer.size > 1280) { 248 if (listen_sock && buffer.size > 1280) {
249 if (multi_count) {
250 buffer.data[multi_start] |= multi_count - 2;
251 multi_count = 0;
252 last_event_type = 0xFF;
253 }
249 flush_socket(); 254 flush_socket();
250 } 255 }
251 } 256 }
252 257
253 static uint32_t last_word_address; 258 static uint32_t last_word_address;
392 socket_blocking(reader->socket, 0); 397 socket_blocking(reader->socket, 0);
393 int flag = 1; 398 int flag = 1;
394 setsockopt(reader->socket, IPPROTO_TCP, TCP_NODELAY, (const char *)&flag, sizeof(flag)); 399 setsockopt(reader->socket, IPPROTO_TCP, TCP_NODELAY, (const char *)&flag, sizeof(flag));
395 } 400 }
396 401
402 static void read_from_socket(event_reader *reader)
403 {
404 if (reader->storage - (reader->buffer.size - reader->buffer.cur_pos) < 128 * 1024) {
405 reader->storage *= 2;
406 uint8_t *new_buf = malloc(reader->storage);
407 memcpy(new_buf, reader->buffer.data + reader->buffer.cur_pos, reader->buffer.size - reader->buffer.cur_pos);
408 free(reader->buffer.data);
409 reader->buffer.data = new_buf;
410 reader->buffer.size -= reader->buffer.cur_pos;
411 reader->buffer.cur_pos = 0;
412 } else if (reader->buffer.cur_pos >= reader->buffer.size/2 && reader->buffer.size >= reader->storage/2) {
413 memmove(reader->buffer.data, reader->buffer.data + reader->buffer.cur_pos, reader->buffer.size - reader->buffer.cur_pos);
414 reader->buffer.size -= reader->buffer.cur_pos;
415 reader->buffer.cur_pos = 0;
416 }
417 int bytes = recv(reader->socket, reader->buffer.data + reader->buffer.size, reader->storage - reader->buffer.size, 0);
418 if (bytes >= 0) {
419 reader->buffer.size += bytes;
420 } else if (!socket_error_is_wouldblock()) {
421 fatal_error("Connection closed, error = %X\n", socket_last_error());
422 }
423 }
424
425 void reader_ensure_data(event_reader *reader, size_t bytes)
426 {
427 if (reader->socket && reader->buffer.size - reader->buffer.cur_pos < bytes) {
428 socket_blocking(reader->socket, 1);
429 while (reader->buffer.size - reader->buffer.cur_pos < bytes) {
430 read_from_socket(reader);
431 }
432 socket_blocking(reader->socket, 0);
433 }
434 }
435
397 uint8_t reader_next_event(event_reader *reader, uint32_t *cycle_out) 436 uint8_t reader_next_event(event_reader *reader, uint32_t *cycle_out)
398 { 437 {
399 if (reader->repeat_remaining) { 438 if (reader->repeat_remaining) {
400 reader->repeat_remaining--; 439 reader->repeat_remaining--;
401 *cycle_out = reader->last_cycle + reader->repeat_delta; 440 *cycle_out = reader->last_cycle + reader->repeat_delta;
402 reader->last_cycle = *cycle_out; 441 reader->last_cycle = *cycle_out;
403 return reader->repeat_event; 442 return reader->repeat_event;
404 } 443 }
405 if (reader->socket) { 444 if (reader->socket) {
406 uint8_t blocking = 0; 445 read_from_socket(reader);
407 if (reader->buffer.size - reader->buffer.cur_pos < 9) { 446 reader_ensure_data(reader, 1);
408 //set back to block mode
409 socket_blocking(reader->socket, 1);
410 blocking = 1;
411 }
412 if (reader->storage - (reader->buffer.size - reader->buffer.cur_pos) < 128 * 1024) {
413 reader->storage *= 2;
414 uint8_t *new_buf = malloc(reader->storage);
415 memcpy(new_buf, reader->buffer.data + reader->buffer.cur_pos, reader->buffer.size - reader->buffer.cur_pos);
416 free(reader->buffer.data);
417 reader->buffer.data = new_buf;
418 reader->buffer.size -= reader->buffer.cur_pos;
419 reader->buffer.cur_pos = 0;
420 } else if (reader->buffer.cur_pos >= reader->buffer.size/2 && reader->buffer.size >= reader->storage/2) {
421 memmove(reader->buffer.data, reader->buffer.data + reader->buffer.cur_pos, reader->buffer.size - reader->buffer.cur_pos);
422 reader->buffer.size -= reader->buffer.cur_pos;
423 reader->buffer.cur_pos = 0;
424 }
425 int bytes = recv(reader->socket, reader->buffer.data + reader->buffer.size, reader->storage - reader->buffer.size, 0);
426 if (bytes >= 0) {
427 reader->buffer.size += bytes;
428 if (blocking && reader->buffer.size - reader->buffer.cur_pos >= 9) {
429 socket_blocking(reader->socket, 0);
430 }
431 } else if (!socket_error_is_wouldblock()) {
432 printf("Connection closed, error = %X\n", socket_last_error());
433 }
434 } 447 }
435 uint8_t header = load_int8(&reader->buffer); 448 uint8_t header = load_int8(&reader->buffer);
436 uint8_t ret; 449 uint8_t ret;
437 uint32_t delta; 450 uint32_t delta;
438 uint8_t multi_start = 0; 451 uint8_t multi_start = 0;
439 if ((header & 0xF0) == (EVENT_MULTI << 4)) { 452 if ((header & 0xF0) == (EVENT_MULTI << 4)) {
440 reader->repeat_remaining = (header & 0xF) + 1; 453 reader->repeat_remaining = (header & 0xF) + 1;
441 multi_start = 1; 454 multi_start = 1;
455 reader_ensure_data(reader, 1);
442 header = load_int8(&reader->buffer); 456 header = load_int8(&reader->buffer);
443 } 457 }
444 if ((header & 0xF0) < FORMAT_3BYTE) { 458 if ((header & 0xF0) < FORMAT_3BYTE) {
445 delta = (header & 0xF) + 16; 459 delta = (header & 0xF) + 16;
446 ret = header >> 4; 460 ret = header >> 4;
447 } else if ((header & 0xF0) == FORMAT_3BYTE) { 461 } else if ((header & 0xF0) == FORMAT_3BYTE) {
462 reader_ensure_data(reader, 2);
448 delta = load_int16(&reader->buffer); 463 delta = load_int16(&reader->buffer);
449 ret = header & 0xF; 464 ret = header & 0xF;
450 } else { 465 } else {
466 reader_ensure_data(reader, 3);
451 delta = load_int8(&reader->buffer) << 16; 467 delta = load_int8(&reader->buffer) << 16;
452 //sign extend 24-bit delta to 32-bit 468 //sign extend 24-bit delta to 32-bit
453 if (delta & 0x800000) { 469 if (delta & 0x800000) {
454 delta |= 0xFF000000; 470 delta |= 0xFF000000;
455 } 471 }
461 reader->repeat_delta = delta; 477 reader->repeat_delta = delta;
462 } 478 }
463 *cycle_out = reader->last_cycle + delta; 479 *cycle_out = reader->last_cycle + delta;
464 reader->last_cycle = *cycle_out; 480 reader->last_cycle = *cycle_out;
465 if (ret == EVENT_ADJUST) { 481 if (ret == EVENT_ADJUST) {
482 reader_ensure_data(reader, 4);
466 size_t old_pos = reader->buffer.cur_pos; 483 size_t old_pos = reader->buffer.cur_pos;
467 uint32_t adjust = load_int32(&reader->buffer); 484 uint32_t adjust = load_int32(&reader->buffer);
468 reader->buffer.cur_pos = old_pos; 485 reader->buffer.cur_pos = old_pos;
469 reader->last_cycle -= adjust; 486 reader->last_cycle -= adjust;
470 } else if (ret == EVENT_STATE) { 487 } else if (ret == EVENT_STATE) {
488 reader_ensure_data(reader, 8);
471 reader->last_cycle = load_int32(&reader->buffer); 489 reader->last_cycle = load_int32(&reader->buffer);
472 reader->last_word_address = load_int8(&reader->buffer) << 16; 490 reader->last_word_address = load_int8(&reader->buffer) << 16;
473 reader->last_word_address |= load_int16(&reader->buffer); 491 reader->last_word_address |= load_int16(&reader->buffer);
474 reader->last_byte_address = load_int16(&reader->buffer); 492 reader->last_byte_address = load_int16(&reader->buffer);
475 } 493 }