mirror of
https://github.com/tribufu/node-gamedig
synced 2026-05-18 09:35:50 +00:00
Initial es6 async conversion work
This commit is contained in:
parent
a054557f10
commit
77b2cc1c7f
10 changed files with 773 additions and 748 deletions
|
|
@ -1,9 +1,11 @@
|
|||
const EventEmitter = require('events').EventEmitter,
|
||||
dns = require('dns'),
|
||||
net = require('net'),
|
||||
async = require('async'),
|
||||
Reader = require('../lib/reader'),
|
||||
HexUtil = require('../lib/HexUtil');
|
||||
HexUtil = require('../lib/HexUtil'),
|
||||
util = require('util'),
|
||||
dnsLookupAsync = util.promisify(dns.lookup),
|
||||
dnsResolveAsync = util.promisify(dns.resolve);
|
||||
|
||||
class Core extends EventEmitter {
|
||||
constructor() {
|
||||
|
|
@ -13,23 +15,15 @@ class Core extends EventEmitter {
|
|||
attemptTimeout: 10000,
|
||||
maxAttempts: 1
|
||||
};
|
||||
this.attempt = 1;
|
||||
this.finished = false;
|
||||
this.encoding = 'utf8';
|
||||
this.byteorder = 'le';
|
||||
this.delimiter = '\0';
|
||||
this.srvRecord = null;
|
||||
this.attemptTimeoutTimer = null;
|
||||
}
|
||||
|
||||
fatal(err,noretry) {
|
||||
if(!noretry && this.attempt < this.options.maxAttempts) {
|
||||
this.attempt++;
|
||||
this.start();
|
||||
return;
|
||||
}
|
||||
|
||||
this.done({error: err.toString()});
|
||||
this.attemptAbortables = new Set();
|
||||
this.udpCallback = null;
|
||||
this.udpLocked = false;
|
||||
this.lastAbortableId = 0;
|
||||
}
|
||||
|
||||
initState() {
|
||||
|
|
@ -46,128 +40,138 @@ class Core extends EventEmitter {
|
|||
};
|
||||
}
|
||||
|
||||
finalizeState(state) {}
|
||||
// Run all attempts
|
||||
async runAll() {
|
||||
let result = null;
|
||||
let lastError = null;
|
||||
for (let attempt = 1; attempt <= this.options.maxAttempts; attempt++) {
|
||||
try {
|
||||
result = await this.runOnceSafe();
|
||||
result.query.attempts = attempt;
|
||||
break;
|
||||
} catch (e) {
|
||||
lastError = e;
|
||||
}
|
||||
}
|
||||
|
||||
finish(state) {
|
||||
this.finalizeState(state);
|
||||
this.done(state);
|
||||
if (result === null) {
|
||||
throw lastError;
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
done(state) {
|
||||
if(this.finished) return;
|
||||
// Runs a single attempt with a timeout and cleans up afterward
|
||||
async runOnceSafe() {
|
||||
try {
|
||||
const result = await this.timedPromise(this.runOnce(), this.options.attemptTimeout, "Attempt");
|
||||
if (this.attemptAbortables.size) {
|
||||
let out = [];
|
||||
for (const abortable of this.attemptAbortables) {
|
||||
out.push(abortable.id + " " + abortable.stack);
|
||||
}
|
||||
throw new Error('Query succeeded, but abortables were not empty (async leak?):\n' + out.join('\n---\n'));
|
||||
}
|
||||
return result;
|
||||
} finally {
|
||||
// Clean up any lingering long-running functions
|
||||
for (const abortable of this.attemptAbortables) {
|
||||
try {
|
||||
abortable.abort();
|
||||
} catch(e) {}
|
||||
}
|
||||
this.attemptAbortables.clear();
|
||||
}
|
||||
}
|
||||
|
||||
if(this.options.notes)
|
||||
timedPromise(promise, timeoutMs, timeoutMsg) {
|
||||
return new Promise((resolve, reject) => {
|
||||
const cancelTimeout = this.setTimeout(
|
||||
() => reject(new Error(timeoutMsg + " - Timed out after " + timeoutMs + "ms")),
|
||||
timeoutMs
|
||||
);
|
||||
promise.finally(cancelTimeout).then(resolve,reject);
|
||||
});
|
||||
}
|
||||
|
||||
async runOnce() {
|
||||
const startMillis = Date.now();
|
||||
const options = this.options;
|
||||
if (('host' in options) && !('address' in options)) {
|
||||
options.address = await this.parseDns(options.host);
|
||||
}
|
||||
if(!('port_query' in options) && 'port' in options) {
|
||||
const offset = options.port_query_offset || 0;
|
||||
options.port_query = options.port + offset;
|
||||
}
|
||||
|
||||
const state = this.initState();
|
||||
await this.run(state);
|
||||
|
||||
if (this.options.notes)
|
||||
state.notes = this.options.notes;
|
||||
|
||||
state.query = {};
|
||||
if('host' in this.options) state.query.host = this.options.host;
|
||||
if('address' in this.options) state.query.address = this.options.address;
|
||||
if('port' in this.options) state.query.port = this.options.port;
|
||||
if('port_query' in this.options) state.query.port_query = this.options.port_query;
|
||||
if ('host' in this.options) state.query.host = this.options.host;
|
||||
if ('address' in this.options) state.query.address = this.options.address;
|
||||
if ('port' in this.options) state.query.port = this.options.port;
|
||||
if ('port_query' in this.options) state.query.port_query = this.options.port_query;
|
||||
state.query.type = this.type;
|
||||
if('pretty' in this) state.query.pretty = this.pretty;
|
||||
state.query.duration = Date.now() - this.startMillis;
|
||||
state.query.attempts = this.attempt;
|
||||
if ('pretty' in this) state.query.pretty = this.pretty;
|
||||
state.query.duration = Date.now() - startMillis;
|
||||
|
||||
this.reset();
|
||||
this.finished = true;
|
||||
this.emit('finished',state);
|
||||
if(this.options.callback) this.options.callback(state);
|
||||
return state;
|
||||
}
|
||||
|
||||
reset() {
|
||||
clearTimeout(this.attemptTimeoutTimer);
|
||||
if(this.timers) {
|
||||
for (const timer of this.timers) {
|
||||
clearTimeout(timer);
|
||||
}
|
||||
}
|
||||
this.timers = [];
|
||||
async run(state) {}
|
||||
|
||||
if(this.tcpSocket) {
|
||||
this.tcpSocket.destroy();
|
||||
delete this.tcpSocket;
|
||||
}
|
||||
|
||||
this.udpTimeoutTimer = false;
|
||||
this.udpCallback = false;
|
||||
}
|
||||
|
||||
start() {
|
||||
const options = this.options;
|
||||
this.reset();
|
||||
|
||||
this.startMillis = Date.now();
|
||||
|
||||
this.attemptTimeoutTimer = setTimeout(() => {
|
||||
this.fatal('timeout');
|
||||
},this.options.attemptTimeout);
|
||||
|
||||
async.series([
|
||||
(c) => {
|
||||
// resolve host names
|
||||
if(!('host' in options)) return c();
|
||||
if(options.host.match(/\d+\.\d+\.\d+\.\d+/)) {
|
||||
options.address = options.host;
|
||||
c();
|
||||
} else {
|
||||
this.parseDns(options.host,c);
|
||||
}
|
||||
},
|
||||
(c) => {
|
||||
// calculate query port if needed
|
||||
if(!('port_query' in options) && 'port' in options) {
|
||||
const offset = options.port_query_offset || 0;
|
||||
options.port_query = options.port + offset;
|
||||
}
|
||||
c();
|
||||
},
|
||||
(c) => {
|
||||
// run
|
||||
this.run(this.initState());
|
||||
}
|
||||
|
||||
]);
|
||||
}
|
||||
|
||||
run() {}
|
||||
|
||||
parseDns(host,c) {
|
||||
const resolveStandard = (host,c) => {
|
||||
/**
|
||||
* @param {string} host
|
||||
* @returns {Promise<string>}
|
||||
*/
|
||||
async parseDns(host) {
|
||||
const isIp = (host) => {
|
||||
return !!host.match(/\d+\.\d+\.\d+\.\d+/);
|
||||
};
|
||||
const resolveStandard = async (host) => {
|
||||
if(isIp(host)) return host;
|
||||
if(this.debug) console.log("Standard DNS Lookup: " + host);
|
||||
dns.lookup(host, (err,address,family) => {
|
||||
if(err) return this.fatal(err);
|
||||
if(this.debug) console.log(address);
|
||||
this.options.address = address;
|
||||
c();
|
||||
});
|
||||
const {address,family} = await dnsLookupAsync(host);
|
||||
if(this.debug) console.log(address);
|
||||
return address;
|
||||
};
|
||||
|
||||
const resolveSrv = (srv,host,c) => {
|
||||
const resolveSrv = async (srv,host) => {
|
||||
if(isIp(host)) return host;
|
||||
if(this.debug) console.log("SRV DNS Lookup: " + srv+'.'+host);
|
||||
dns.resolve(srv+'.'+host, 'SRV', (err,addresses) => {
|
||||
if(this.debug) console.log(err, addresses);
|
||||
if(err) return resolveStandard(host,c);
|
||||
if(addresses.length >= 1) {
|
||||
const line = addresses[0];
|
||||
this.options.port = line.port;
|
||||
const srvhost = line.name;
|
||||
|
||||
if(srvhost.match(/\d+\.\d+\.\d+\.\d+/)) {
|
||||
this.options.address = srvhost;
|
||||
c();
|
||||
} else {
|
||||
// resolve yet again
|
||||
resolveStandard(srvhost,c);
|
||||
}
|
||||
return;
|
||||
let records;
|
||||
try {
|
||||
records = await dnsResolveAsync(srv + '.' + host, 'SRV');
|
||||
if(this.debug) console.log(records);
|
||||
if(records.length >= 1) {
|
||||
const record = records[0];
|
||||
this.options.port = record.port;
|
||||
const srvhost = record.name;
|
||||
return await resolveStandard(srvhost);
|
||||
}
|
||||
return resolveStandard(host,c);
|
||||
});
|
||||
} catch(e) {
|
||||
if (this.debug) console.log(e.toString());
|
||||
}
|
||||
return await resolveStandard(host);
|
||||
};
|
||||
|
||||
if(this.srvRecord) resolveSrv(this.srvRecord,host,c);
|
||||
else resolveStandard(host,c);
|
||||
if(this.srvRecord) return await resolveSrv(this.srvRecord, host);
|
||||
else return await resolveStandard(host);
|
||||
}
|
||||
|
||||
addAbortable(fn) {
|
||||
const id = ++this.lastAbortableId;
|
||||
const stack = new Error().stack;
|
||||
const entry = { id: id, abort: fn, stack: stack };
|
||||
if (this.debug) console.log("Adding abortable: " + id);
|
||||
this.attemptAbortables.add(entry);
|
||||
return () => {
|
||||
if (this.debug) console.log("Removing abortable: " + id);
|
||||
this.attemptAbortables.delete(entry);
|
||||
}
|
||||
}
|
||||
|
||||
// utils
|
||||
|
|
@ -184,125 +188,169 @@ class Core extends EventEmitter {
|
|||
}
|
||||
}
|
||||
}
|
||||
setTimeout(c,t) {
|
||||
if(this.finished) return 0;
|
||||
const id = setTimeout(c,t);
|
||||
this.timers.push(id);
|
||||
return id;
|
||||
}
|
||||
|
||||
trueTest(str) {
|
||||
if(typeof str === 'boolean') return str;
|
||||
if(typeof str === 'number') return str !== 0;
|
||||
if(typeof str === 'string') {
|
||||
if(str.toLowerCase() === 'true') return true;
|
||||
if(str === 'yes') return true;
|
||||
if(str.toLowerCase() === 'yes') return true;
|
||||
if(str === '1') return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
_tcpConnect(c) {
|
||||
if(this.tcpSocket) return c(this.tcpSocket);
|
||||
|
||||
let connected = false;
|
||||
let received = Buffer.from([]);
|
||||
/**
|
||||
* @param {function(Socket):Promise} fn
|
||||
* @returns {Promise<Socket>}
|
||||
*/
|
||||
async withTcp(fn) {
|
||||
const address = this.options.address;
|
||||
const port = this.options.port_query;
|
||||
|
||||
const socket = this.tcpSocket = net.connect(port,address,() => {
|
||||
if(this.debug) console.log(address+':'+port+" TCPCONNECTED");
|
||||
connected = true;
|
||||
c(socket);
|
||||
});
|
||||
const socket = net.connect(port,address);
|
||||
socket.setNoDelay(true);
|
||||
if(this.debug) console.log(address+':'+port+" TCPCONNECT");
|
||||
const cancelAbortable = this.addAbortable(() => socket.destroy());
|
||||
|
||||
const writeHook = socket.write;
|
||||
socket.write = (...args) => {
|
||||
if(this.debug) {
|
||||
if(this.debug) {
|
||||
console.log(address+':'+port+" TCP Connecting");
|
||||
const writeHook = socket.write;
|
||||
socket.write = (...args) => {
|
||||
console.log(address+':'+port+" TCP-->");
|
||||
console.log(HexUtil.debugDump(args[0]));
|
||||
}
|
||||
writeHook.apply(socket,args);
|
||||
};
|
||||
|
||||
socket.on('error', () => {});
|
||||
socket.on('close', () => {
|
||||
if(!this.tcpCallback) return;
|
||||
if(connected) return this.fatal('Socket closed while waiting on TCP');
|
||||
else return this.fatal('TCP Connection Refused');
|
||||
});
|
||||
socket.on('data', (data) => {
|
||||
if(!this.tcpCallback) return;
|
||||
if(this.debug) {
|
||||
console.log(address+':'+port+" <--TCP");
|
||||
console.log(HexUtil.debugDump(data));
|
||||
}
|
||||
received = Buffer.concat([received,data]);
|
||||
if(this.tcpCallback(received)) {
|
||||
clearTimeout(this.tcpTimeoutTimer);
|
||||
this.tcpCallback = false;
|
||||
received = Buffer.from([]);
|
||||
}
|
||||
});
|
||||
}
|
||||
tcpSend(buffer,ondata) {
|
||||
process.nextTick(() => {
|
||||
if(this.tcpCallback) return this.fatal('Attempted to send TCP packet while still waiting on a managed response');
|
||||
this._tcpConnect((socket) => {
|
||||
socket.write(buffer);
|
||||
writeHook.apply(socket,args);
|
||||
};
|
||||
socket.on('error', e => console.log('TCP Error: ' + e));
|
||||
socket.on('close', () => console.log('TCP Closed'));
|
||||
socket.on('data', (data) => {
|
||||
if(this.debug) {
|
||||
console.log(address+':'+port+" <--TCP");
|
||||
console.log(HexUtil.debugDump(data));
|
||||
}
|
||||
});
|
||||
if(!ondata) return;
|
||||
socket.on('ready', () => console.log(address+':'+port+" TCP Connected"));
|
||||
}
|
||||
|
||||
this.tcpTimeoutTimer = this.setTimeout(() => {
|
||||
this.tcpCallback = false;
|
||||
this.fatal('TCP Watchdog Timeout');
|
||||
},this.options.socketTimeout);
|
||||
this.tcpCallback = ondata;
|
||||
});
|
||||
try {
|
||||
await this.timedPromise(
|
||||
new Promise((resolve,reject) => {
|
||||
socket.on('ready', resolve);
|
||||
socket.on('close', () => reject(new Error('TCP Connection Refused')));
|
||||
}),
|
||||
this.options.socketTimeout,
|
||||
'TCP Opening'
|
||||
);
|
||||
return await fn(socket);
|
||||
} finally {
|
||||
cancelAbortable();
|
||||
socket.destroy();
|
||||
}
|
||||
}
|
||||
|
||||
udpSend(buffer,onpacket,ontimeout) {
|
||||
process.nextTick(() => {
|
||||
if(this.udpCallback) return this.fatal('Attempted to send UDP packet while still waiting on a managed response');
|
||||
this._udpSendNow(buffer);
|
||||
if(!onpacket) return;
|
||||
|
||||
this.udpTimeoutTimer = this.setTimeout(() => {
|
||||
this.udpCallback = false;
|
||||
let timeout = false;
|
||||
if(!ontimeout || ontimeout() !== true) timeout = true;
|
||||
if(timeout) this.fatal('UDP Watchdog Timeout');
|
||||
},this.options.socketTimeout);
|
||||
this.udpCallback = onpacket;
|
||||
});
|
||||
setTimeout(callback, time) {
|
||||
let cancelAbortable;
|
||||
const onTimeout = () => {
|
||||
cancelAbortable();
|
||||
callback();
|
||||
};
|
||||
const timeout = setTimeout(onTimeout, time);
|
||||
cancelAbortable = this.addAbortable(() => clearTimeout(timeout));
|
||||
return () => {
|
||||
cancelAbortable();
|
||||
clearTimeout(timeout);
|
||||
}
|
||||
}
|
||||
_udpSendNow(buffer) {
|
||||
if(!('port_query' in this.options)) return this.fatal('Attempted to send without setting a port');
|
||||
if(!('address' in this.options)) return this.fatal('Attempted to send without setting an address');
|
||||
|
||||
/**
|
||||
* @param {Socket} socket
|
||||
* @param {Buffer} buffer
|
||||
* @param {function(Buffer):boolean} ondata
|
||||
* @returns {Promise}
|
||||
*/
|
||||
async tcpSend(socket,buffer,ondata) {
|
||||
return await this.timedPromise(
|
||||
new Promise(async (resolve,reject) => {
|
||||
let received = Buffer.from([]);
|
||||
const onData = (data) => {
|
||||
received = Buffer.concat([received, data]);
|
||||
const result = ondata(received);
|
||||
if (result !== undefined) {
|
||||
socket.off('data', onData);
|
||||
resolve(result);
|
||||
}
|
||||
};
|
||||
socket.on('data', onData);
|
||||
socket.write(buffer);
|
||||
}),
|
||||
this.options.socketTimeout,
|
||||
'TCP'
|
||||
);
|
||||
}
|
||||
|
||||
async withUdpLock(fn) {
|
||||
if (this.udpLocked) {
|
||||
throw new Error('Attempted to lock UDP when already locked');
|
||||
}
|
||||
this.udpLocked = true;
|
||||
try {
|
||||
return await fn();
|
||||
} finally {
|
||||
this.udpLocked = false;
|
||||
this.udpCallback = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @param {Buffer|string} buffer
|
||||
* @param {function(Buffer):T} onPacket
|
||||
* @param {(function():T)=} onTimeout
|
||||
* @returns Promise<T>
|
||||
* @template T
|
||||
*/
|
||||
async udpSend(buffer,onPacket,onTimeout) {
|
||||
if(!('port_query' in this.options)) throw new Error('Attempted to send without setting a port');
|
||||
if(!('address' in this.options)) throw new Error('Attempted to send without setting an address');
|
||||
if(typeof buffer === 'string') buffer = Buffer.from(buffer,'binary');
|
||||
|
||||
if(this.debug) {
|
||||
console.log(this.options.address+':'+this.options.port_query+" UDP-->");
|
||||
console.log(HexUtil.debugDump(buffer));
|
||||
}
|
||||
this.udpSocket.send(buffer,0,buffer.length,this.options.port_query,this.options.address);
|
||||
|
||||
return await this.withUdpLock(async() => {
|
||||
this.udpSocket.send(buffer,0,buffer.length,this.options.port_query,this.options.address);
|
||||
|
||||
return await new Promise((resolve,reject) => {
|
||||
const cancelTimeout = this.setTimeout(() => {
|
||||
if (this.debug) console.log("UDP timeout detected");
|
||||
let success = false;
|
||||
if (onTimeout) {
|
||||
const result = onTimeout();
|
||||
if (result !== undefined) {
|
||||
if (this.debug) console.log("UDP timeout resolved by callback");
|
||||
resolve(result);
|
||||
success = true;
|
||||
}
|
||||
}
|
||||
if (!success) {
|
||||
reject(new Error('UDP Watchdog Timeout'));
|
||||
}
|
||||
},this.options.socketTimeout);
|
||||
|
||||
this.udpCallback = (buffer) => {
|
||||
const result = onPacket(buffer);
|
||||
if(result !== undefined) {
|
||||
if (this.debug) console.log("UDP send finished by callback");
|
||||
cancelTimeout();
|
||||
resolve(result);
|
||||
}
|
||||
};
|
||||
});
|
||||
});
|
||||
}
|
||||
_udpResponse(buffer) {
|
||||
if(this.udpCallback) {
|
||||
const result = this.udpCallback(buffer);
|
||||
if(result === true) {
|
||||
// we're done with this udp session
|
||||
clearTimeout(this.udpTimeoutTimer);
|
||||
this.udpCallback = false;
|
||||
}
|
||||
} else {
|
||||
this.udpResponse(buffer);
|
||||
}
|
||||
|
||||
_udpIncoming(buffer) {
|
||||
this.udpCallback && this.udpCallback(buffer);
|
||||
}
|
||||
udpResponse() {}
|
||||
}
|
||||
|
||||
module.exports = Core;
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue