More async conversions

This commit is contained in:
mmorrison 2019-01-09 05:35:11 -06:00
parent 77b2cc1c7f
commit 9b8423b20a
15 changed files with 859 additions and 704 deletions

View file

@ -5,7 +5,8 @@ const EventEmitter = require('events').EventEmitter,
HexUtil = require('../lib/HexUtil'),
util = require('util'),
dnsLookupAsync = util.promisify(dns.lookup),
dnsResolveAsync = util.promisify(dns.resolve);
dnsResolveAsync = util.promisify(dns.resolve),
requestAsync = require('request-promise');
class Core extends EventEmitter {
constructor() {
@ -20,10 +21,10 @@ class Core extends EventEmitter {
this.delimiter = '\0';
this.srvRecord = null;
this.attemptAbortables = new Set();
this.asyncLeaks = new Set();
this.udpCallback = null;
this.udpLocked = false;
this.lastAbortableId = 0;
this.lastAsyncLeakId = 0;
}
initState() {
@ -64,22 +65,24 @@ class Core extends EventEmitter {
async runOnceSafe() {
try {
const result = await this.timedPromise(this.runOnce(), this.options.attemptTimeout, "Attempt");
if (this.attemptAbortables.size) {
if (this.asyncLeaks.size) {
let out = [];
for (const abortable of this.attemptAbortables) {
out.push(abortable.id + " " + abortable.stack);
for (const leak of this.asyncLeaks) {
out.push(leak.id + " " + leak.stack);
}
throw new Error('Query succeeded, but abortables were not empty (async leak?):\n' + out.join('\n---\n'));
throw new Error('Query succeeded, but async leak was detected:\n' + out.join('\n---\n'));
}
return result;
} finally {
// Clean up any lingering long-running functions
for (const abortable of this.attemptAbortables) {
for (const leak of this.asyncLeaks) {
try {
abortable.abort();
} catch(e) {}
leak.cleanup();
} catch(e) {
if (this.debug) console.log("Error during async cleanup: " + e.stack);
}
}
this.attemptAbortables.clear();
this.asyncLeaks.clear();
}
}
@ -162,15 +165,15 @@ class Core extends EventEmitter {
else return await resolveStandard(host);
}
addAbortable(fn) {
const id = ++this.lastAbortableId;
addAsyncLeak(fn) {
const id = ++this.lastAsyncLeakId;
const stack = new Error().stack;
const entry = { id: id, abort: fn, stack: stack };
if (this.debug) console.log("Adding abortable: " + id);
this.attemptAbortables.add(entry);
const entry = { id: id, cleanup: fn, stack: stack };
if (this.debug) console.log("Registering async leak: " + id);
this.asyncLeaks.add(entry);
return () => {
if (this.debug) console.log("Removing abortable: " + id);
this.attemptAbortables.delete(entry);
if (this.debug) console.log("Removing async leak: " + id);
this.asyncLeaks.delete(entry);
}
}
@ -210,7 +213,7 @@ class Core extends EventEmitter {
const socket = net.connect(port,address);
socket.setNoDelay(true);
const cancelAbortable = this.addAbortable(() => socket.destroy());
const cancelAsyncLeak = this.addAsyncLeak(() => socket.destroy());
if(this.debug) {
console.log(address+':'+port+" TCP Connecting");
@ -242,21 +245,21 @@ class Core extends EventEmitter {
);
return await fn(socket);
} finally {
cancelAbortable();
cancelAsyncLeak();
socket.destroy();
}
}
setTimeout(callback, time) {
let cancelAbortable;
let cancelAsyncLeak;
const onTimeout = () => {
cancelAbortable();
cancelAsyncLeak();
callback();
};
const timeout = setTimeout(onTimeout, time);
cancelAbortable = this.addAbortable(() => clearTimeout(timeout));
cancelAsyncLeak = this.addAsyncLeak(() => clearTimeout(timeout));
return () => {
cancelAbortable();
cancelAsyncLeak();
clearTimeout(timeout);
}
}
@ -351,6 +354,18 @@ class Core extends EventEmitter {
_udpIncoming(buffer) {
this.udpCallback && this.udpCallback(buffer);
}
request(params) {
const promise = requestAsync({
...params,
timeout: this.options.socketTimeout
});
const cancelAsyncLeak = this.addAsyncLeak(() => {
promise.cancel();
});
promise.finally(cancelAsyncLeak);
return promise;
}
}
module.exports = Core;