mirror of
https://github.com/tribufu/node-gamedig
synced 2026-05-18 09:35:50 +00:00
Additional async rewrite
This commit is contained in:
parent
efe12a00aa
commit
29ce0b82d0
24 changed files with 654 additions and 470 deletions
|
|
@ -6,43 +6,24 @@ const EventEmitter = require('events').EventEmitter,
|
|||
util = require('util'),
|
||||
dnsLookupAsync = util.promisify(dns.lookup),
|
||||
dnsResolveAsync = util.promisify(dns.resolve),
|
||||
requestAsync = require('request-promise');
|
||||
requestAsync = require('request-promise'),
|
||||
Promises = require('../lib/Promises');
|
||||
|
||||
class Core extends EventEmitter {
|
||||
constructor() {
|
||||
super();
|
||||
this.options = {
|
||||
socketTimeout: 2000,
|
||||
attemptTimeout: 10000,
|
||||
maxAttempts: 1
|
||||
};
|
||||
this.encoding = 'utf8';
|
||||
this.byteorder = 'le';
|
||||
this.delimiter = '\0';
|
||||
this.srvRecord = null;
|
||||
this.abortedPromise = null;
|
||||
|
||||
this.asyncLeaks = new Set();
|
||||
this.udpCallback = null;
|
||||
this.udpLocked = false;
|
||||
this.lastAsyncLeakId = 0;
|
||||
// Sent to us by QueryRunner
|
||||
this.options = null;
|
||||
this.udpSocket = null;
|
||||
}
|
||||
|
||||
initState() {
|
||||
return {
|
||||
name: '',
|
||||
map: '',
|
||||
password: false,
|
||||
|
||||
raw: {},
|
||||
|
||||
maxplayers: 0,
|
||||
players: [],
|
||||
bots: []
|
||||
};
|
||||
}
|
||||
|
||||
// Run all attempts
|
||||
async runAll() {
|
||||
async runAllAttempts() {
|
||||
let result = null;
|
||||
let lastError = null;
|
||||
for (let attempt = 1; attempt <= this.options.maxAttempts; attempt++) {
|
||||
|
|
@ -63,38 +44,27 @@ class Core extends EventEmitter {
|
|||
|
||||
// Runs a single attempt with a timeout and cleans up afterward
|
||||
async runOnceSafe() {
|
||||
try {
|
||||
const result = await this.timedPromise(this.runOnce(), this.options.attemptTimeout, "Attempt");
|
||||
if (this.asyncLeaks.size) {
|
||||
let out = [];
|
||||
for (const leak of this.asyncLeaks) {
|
||||
out.push(leak.id + " " + leak.stack);
|
||||
}
|
||||
throw new Error('Query succeeded, but async leak was detected:\n' + out.join('\n---\n'));
|
||||
}
|
||||
return result;
|
||||
} finally {
|
||||
// Clean up any lingering long-running functions
|
||||
for (const leak of this.asyncLeaks) {
|
||||
try {
|
||||
leak.cleanup();
|
||||
} catch(e) {
|
||||
this.debugLog("Error during async cleanup: " + e.stack);
|
||||
}
|
||||
}
|
||||
this.asyncLeaks.clear();
|
||||
}
|
||||
}
|
||||
|
||||
timedPromise(promise, timeoutMs, timeoutMsg) {
|
||||
return new Promise((resolve,reject) => {
|
||||
const cancelTimeout = this.setTimeout(
|
||||
() => reject(new Error(timeoutMsg + " - Timed out after " + timeoutMs + "ms")),
|
||||
timeoutMs
|
||||
);
|
||||
promise = promise.finally(cancelTimeout);
|
||||
promise.then(resolve,reject);
|
||||
let abortCall = null;
|
||||
this.abortedPromise = new Promise((resolve,reject) => {
|
||||
abortCall = () => reject("Query is finished -- cancelling outstanding promises");
|
||||
});
|
||||
|
||||
// Make sure that if this promise isn't attached to, it doesn't throw a unhandled promise rejection
|
||||
this.abortedPromise.catch(() => {});
|
||||
|
||||
let timeout;
|
||||
try {
|
||||
const promise = this.runOnce();
|
||||
timeout = Promises.createTimeout(this.options.attemptTimeout, "Attempt");
|
||||
return await Promise.race([promise,timeout]);
|
||||
} finally {
|
||||
timeout && timeout.cancel();
|
||||
try {
|
||||
abortCall();
|
||||
} catch(e) {
|
||||
this.debugLog("Error during abort cleanup: " + e.stack);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async runOnce() {
|
||||
|
|
@ -103,25 +73,37 @@ class Core extends EventEmitter {
|
|||
if (('host' in options) && !('address' in options)) {
|
||||
options.address = await this.parseDns(options.host);
|
||||
}
|
||||
if(!('port_query' in options) && 'port' in options) {
|
||||
const offset = options.port_query_offset || 0;
|
||||
options.port_query = options.port + offset;
|
||||
}
|
||||
|
||||
const state = this.initState();
|
||||
const state = {
|
||||
name: '',
|
||||
map: '',
|
||||
password: false,
|
||||
|
||||
raw: {},
|
||||
|
||||
maxplayers: 0,
|
||||
players: [],
|
||||
bots: []
|
||||
};
|
||||
|
||||
await this.run(state);
|
||||
|
||||
if (this.options.notes)
|
||||
state.notes = this.options.notes;
|
||||
|
||||
state.query = {};
|
||||
if ('host' in this.options) state.query.host = this.options.host;
|
||||
if ('address' in this.options) state.query.address = this.options.address;
|
||||
if ('port' in this.options) state.query.port = this.options.port;
|
||||
if ('port_query' in this.options) state.query.port_query = this.options.port_query;
|
||||
state.query.type = this.type;
|
||||
if ('pretty' in this) state.query.pretty = this.pretty;
|
||||
state.query.duration = Date.now() - startMillis;
|
||||
// because lots of servers prefix with spaces to try to appear first
|
||||
state.name = state.name.trim();
|
||||
|
||||
state.duration = Date.now() - startMillis;
|
||||
if (!('connect' in state)) {
|
||||
state.connect = ''
|
||||
+ (state.gameHost || this.options.host || this.options.address)
|
||||
+ ':'
|
||||
+ (state.gamePort || this.options.port)
|
||||
}
|
||||
delete state.gameHost;
|
||||
delete state.gamePort;
|
||||
|
||||
|
||||
return state;
|
||||
}
|
||||
|
|
@ -166,18 +148,6 @@ class Core extends EventEmitter {
|
|||
else return await resolveStandard(host);
|
||||
}
|
||||
|
||||
addAsyncLeak(fn) {
|
||||
const id = ++this.lastAsyncLeakId;
|
||||
const stack = new Error().stack;
|
||||
const entry = { id: id, cleanup: fn, stack: stack };
|
||||
this.debugLog("Registering async leak: " + id);
|
||||
this.asyncLeaks.add(entry);
|
||||
return () => {
|
||||
this.debugLog("Removing async leak: " + id);
|
||||
this.asyncLeaks.delete(entry);
|
||||
}
|
||||
}
|
||||
|
||||
// utils
|
||||
/** @returns {Reader} */
|
||||
reader(buffer) {
|
||||
|
|
@ -204,6 +174,12 @@ class Core extends EventEmitter {
|
|||
return false;
|
||||
}
|
||||
|
||||
assertValidPort(port) {
|
||||
if (!port || port < 1 || port > 65535) {
|
||||
throw new Error("Invalid tcp/ip port: " + port);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @template T
|
||||
* @param {function(Socket):Promise<T>} fn
|
||||
|
|
@ -211,56 +187,45 @@ class Core extends EventEmitter {
|
|||
*/
|
||||
async withTcp(fn) {
|
||||
const address = this.options.address;
|
||||
const port = this.options.port_query;
|
||||
const port = this.options.port;
|
||||
this.assertValidPort(port);
|
||||
|
||||
const socket = net.connect(port,address);
|
||||
socket.setNoDelay(true);
|
||||
const cancelAsyncLeak = this.addAsyncLeak(() => socket.destroy());
|
||||
let socket, connectionTimeout;
|
||||
try {
|
||||
socket = net.connect(port,address);
|
||||
socket.setNoDelay(true);
|
||||
|
||||
this.debugLog(log => {
|
||||
this.debugLog(address+':'+port+" TCP Connecting");
|
||||
const writeHook = socket.write;
|
||||
socket.write = (...args) => {
|
||||
log(address+':'+port+" TCP-->");
|
||||
log(HexUtil.debugDump(args[0]));
|
||||
writeHook.apply(socket,args);
|
||||
};
|
||||
socket.on('error', e => log('TCP Error: ' + e));
|
||||
socket.on('close', () => log('TCP Closed'));
|
||||
socket.on('data', (data) => {
|
||||
this.debugLog(log => {
|
||||
this.debugLog(address+':'+port+" TCP Connecting");
|
||||
const writeHook = socket.write;
|
||||
socket.write = (...args) => {
|
||||
log(address+':'+port+" TCP-->");
|
||||
log(HexUtil.debugDump(args[0]));
|
||||
writeHook.apply(socket,args);
|
||||
};
|
||||
socket.on('error', e => log('TCP Error: ' + e));
|
||||
socket.on('close', () => log('TCP Closed'));
|
||||
socket.on('data', (data) => {
|
||||
log(address+':'+port+" <--TCP");
|
||||
log(data);
|
||||
});
|
||||
socket.on('ready', () => log(address+':'+port+" TCP Connected"));
|
||||
});
|
||||
socket.on('ready', () => log(address+':'+port+" TCP Connected"));
|
||||
});
|
||||
|
||||
try {
|
||||
await this.timedPromise(
|
||||
new Promise((resolve,reject) => {
|
||||
socket.on('ready', resolve);
|
||||
socket.on('close', () => reject(new Error('TCP Connection Refused')));
|
||||
}),
|
||||
this.options.socketTimeout,
|
||||
'TCP Opening'
|
||||
);
|
||||
const connectionPromise = new Promise((resolve,reject) => {
|
||||
socket.on('ready', resolve);
|
||||
socket.on('close', () => reject(new Error('TCP Connection Refused')));
|
||||
});
|
||||
connectionTimeout = Promises.createTimeout(this.options.socketTimeout, 'TCP Opening');
|
||||
await Promise.race([
|
||||
connectionPromise,
|
||||
connectionTimeout,
|
||||
this.abortedPromise
|
||||
]);
|
||||
return await fn(socket);
|
||||
} finally {
|
||||
cancelAsyncLeak();
|
||||
socket.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
setTimeout(callback, time) {
|
||||
let cancelAsyncLeak;
|
||||
const onTimeout = () => {
|
||||
cancelAsyncLeak();
|
||||
callback();
|
||||
};
|
||||
const timeout = setTimeout(onTimeout, time);
|
||||
cancelAsyncLeak = this.addAsyncLeak(() => clearTimeout(timeout));
|
||||
return () => {
|
||||
cancelAsyncLeak();
|
||||
clearTimeout(timeout);
|
||||
socket && socket.destroy();
|
||||
connectionTimeout && connectionTimeout.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -272,8 +237,9 @@ class Core extends EventEmitter {
|
|||
* @returns Promise<T>
|
||||
*/
|
||||
async tcpSend(socket,buffer,ondata) {
|
||||
return await this.timedPromise(
|
||||
new Promise(async (resolve,reject) => {
|
||||
let timeout;
|
||||
try {
|
||||
const promise = new Promise(async (resolve, reject) => {
|
||||
let received = Buffer.from([]);
|
||||
const onData = (data) => {
|
||||
received = Buffer.concat([received, data]);
|
||||
|
|
@ -285,22 +251,11 @@ class Core extends EventEmitter {
|
|||
};
|
||||
socket.on('data', onData);
|
||||
socket.write(buffer);
|
||||
}),
|
||||
this.options.socketTimeout,
|
||||
'TCP'
|
||||
);
|
||||
}
|
||||
|
||||
async withUdpLock(fn) {
|
||||
if (this.udpLocked) {
|
||||
throw new Error('Attempted to lock UDP when already locked');
|
||||
}
|
||||
this.udpLocked = true;
|
||||
try {
|
||||
return await fn();
|
||||
});
|
||||
timeout = Promises.createTimeout(this.options.socketTimeout, 'TCP');
|
||||
return await Promise.race([promise, timeout, this.abortedPromise]);
|
||||
} finally {
|
||||
this.udpLocked = false;
|
||||
this.udpCallback = null;
|
||||
timeout && timeout.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -312,72 +267,93 @@ class Core extends EventEmitter {
|
|||
* @template T
|
||||
*/
|
||||
async udpSend(buffer,onPacket,onTimeout) {
|
||||
if(!('port_query' in this.options)) throw new Error('Attempted to send without setting a port');
|
||||
if(!('address' in this.options)) throw new Error('Attempted to send without setting an address');
|
||||
const address = this.options.address;
|
||||
const port = this.options.port;
|
||||
this.assertValidPort(port);
|
||||
|
||||
if(typeof buffer === 'string') buffer = Buffer.from(buffer,'binary');
|
||||
this.debugLog(log => {
|
||||
log(this.options.address+':'+this.options.port_query+" UDP-->");
|
||||
log(address+':'+port+" UDP-->");
|
||||
log(HexUtil.debugDump(buffer));
|
||||
});
|
||||
|
||||
return await this.withUdpLock(async() => {
|
||||
this.udpSocket.send(buffer,0,buffer.length,this.options.port_query,this.options.address);
|
||||
const socket = this.udpSocket;
|
||||
socket.send(buffer, address, port);
|
||||
|
||||
return await new Promise((resolve,reject) => {
|
||||
const cancelTimeout = this.setTimeout(() => {
|
||||
let socketCallback;
|
||||
let timeout;
|
||||
try {
|
||||
const promise = new Promise((resolve, reject) => {
|
||||
socketCallback = (fromAddress, fromPort, buffer) => {
|
||||
try {
|
||||
if (fromAddress !== address) return;
|
||||
if (fromPort !== port) return;
|
||||
this.debugLog(log => {
|
||||
log(fromAddress + ':' + fromPort + " <--UDP");
|
||||
log(HexUtil.debugDump(buffer));
|
||||
});
|
||||
const result = onPacket(buffer);
|
||||
if (result !== undefined) {
|
||||
this.debugLog("UDP send finished by callback");
|
||||
resolve(result);
|
||||
}
|
||||
} catch(e) {
|
||||
reject(e);
|
||||
}
|
||||
};
|
||||
socket.addCallback(socketCallback);
|
||||
});
|
||||
timeout = Promises.createTimeout(this.options.socketTimeout, 'UDP');
|
||||
const wrappedTimeout = new Promise((resolve, reject) => {
|
||||
timeout.catch((e) => {
|
||||
this.debugLog("UDP timeout detected");
|
||||
let success = false;
|
||||
if (onTimeout) {
|
||||
const result = onTimeout();
|
||||
if (result !== undefined) {
|
||||
this.debugLog("UDP timeout resolved by callback");
|
||||
resolve(result);
|
||||
success = true;
|
||||
try {
|
||||
const result = onTimeout();
|
||||
if (result !== undefined) {
|
||||
this.debugLog("UDP timeout resolved by callback");
|
||||
resolve(result);
|
||||
return;
|
||||
}
|
||||
} catch(e) {
|
||||
reject(e);
|
||||
}
|
||||
}
|
||||
if (!success) {
|
||||
reject(new Error('UDP Watchdog Timeout'));
|
||||
}
|
||||
},this.options.socketTimeout);
|
||||
|
||||
this.udpCallback = (buffer) => {
|
||||
const result = onPacket(buffer);
|
||||
if(result !== undefined) {
|
||||
this.debugLog("UDP send finished by callback");
|
||||
cancelTimeout();
|
||||
resolve(result);
|
||||
}
|
||||
};
|
||||
reject(e);
|
||||
});
|
||||
});
|
||||
});
|
||||
return await Promise.race([promise, wrappedTimeout, this.abortedPromise]);
|
||||
} finally {
|
||||
timeout && timeout.cancel();
|
||||
socketCallback && socket.removeCallback(socketCallback);
|
||||
}
|
||||
}
|
||||
|
||||
_udpIncoming(buffer) {
|
||||
this.udpCallback && this.udpCallback(buffer);
|
||||
}
|
||||
|
||||
request(params) {
|
||||
let promise = requestAsync({
|
||||
...params,
|
||||
timeout: this.options.socketTimeout,
|
||||
resolveWithFullResponse: true
|
||||
});
|
||||
const cancelAsyncLeak = this.addAsyncLeak(() => {
|
||||
promise.cancel();
|
||||
});
|
||||
this.debugLog(log => {
|
||||
log(() => params.uri+" HTTP-->");
|
||||
promise
|
||||
.then((response) => log(params.uri+" <--HTTP " + response.statusCode))
|
||||
.catch(()=>{});
|
||||
});
|
||||
promise = promise.finally(cancelAsyncLeak);
|
||||
promise = promise.then(response => response.body);
|
||||
return promise;
|
||||
async request(params) {
|
||||
let requestPromise;
|
||||
try {
|
||||
requestPromise = requestAsync({
|
||||
...params,
|
||||
timeout: this.options.socketTimeout,
|
||||
resolveWithFullResponse: true
|
||||
});
|
||||
this.debugLog(log => {
|
||||
log(() => params.uri + " HTTP-->");
|
||||
requestPromise
|
||||
.then((response) => log(params.uri + " <--HTTP " + response.statusCode))
|
||||
.catch(() => {
|
||||
});
|
||||
});
|
||||
const wrappedPromise = promise.then(response => response.body);
|
||||
return await Promise.race([wrappedPromise, this.abortedPromise]);
|
||||
} finally {
|
||||
requestPromise && requestPromise.cancel();
|
||||
}
|
||||
}
|
||||
|
||||
debugLog(...args) {
|
||||
if (!this.debug) return;
|
||||
if (!this.options.debug) return;
|
||||
try {
|
||||
if(args[0] instanceof Buffer) {
|
||||
this.debugLog(HexUtil.debugDump(args[0]));
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue