'use strict'
const WebSocket = require('ws')
const { fetchApi } = require('./util/fetch')
/**
* @typedef {object} NetmonEntry
* @property {Object} request
* @property {Object} response
* @property {integer} startedDateTime
* @property {integer} duration
*/
/**
* A connection to netdump running on an instance.
*
* Instances of this class
* are returned from {@link Instance#netdump} and {@link Instance#newNetdump}. They
* should not be created using the constructor.
* @hideconstructor
*/
class Netdump {
constructor (instance) {
this.instance = instance
this.connected = false
this.connectPromise = null
this.id = 0
this.handler = null
this._keepAliveTimeout = null
this._lastPong = null
this._lastPing = null
}
/**
* Ensure netdump is connected.
* @private
*/
async connect () {
this.pendingConnect = true
if (!this.connected) await this.reconnect()
}
/**
* Ensure netdump is disconnected, then connect netdump.
* @private
*/
async reconnect () {
if (this.connected) this.disconnect()
if (this.connectPromise) return this.connectPromise
this.connectPromise = (async () => {
while (this.pendingConnect) {
try {
await this._connect()
break
} catch (e) {
await new Promise(resolve => setTimeout(resolve, 1000))
}
}
this.connectPromise = null
})()
return this.connectPromise
}
async _connect () {
const endpoint = await this.instance.netdumpEndpoint()
// Detect if a disconnection happened before we were able to get netdump endpoint.
if (!this.pendingConnect) throw new Error('connection cancelled')
const ws = new WebSocket(
/^https/.test(endpoint)
? endpoint.replace(/^https/, 'wss')
: /^http/.test(endpoint)
? endpoint.replace(/^http/, 'ws')
: endpoint
)
this.ws = ws
ws.on('message', data => {
try {
let message
if (typeof data === 'string') {
message = JSON.parse(data)
} else if (data.length >= 8) {
message = data.slice(8)
}
if (this.handler) {
this.handler(message)
}
} catch (err) {
console.error('error in agent message handler', err)
}
})
ws.on('close', () => {
this._disconnect()
})
await new Promise((resolve, reject) => {
ws.once('open', () => {
if (this.ws !== ws) {
try {
ws.close()
} catch (e) {
// Swallow ws.close() errors.
}
reject(new Error('connection cancelled'))
return
}
ws.on('error', err => {
if (this.ws === ws) {
this._disconnect()
} else {
try {
ws.close()
} catch (e) {
// Swallow ws.close() errors.
}
}
console.error('error in netdump socket', err)
})
resolve()
})
ws.once('error', err => {
if (this.ws === ws) {
this._disconnect()
} else {
try {
ws.close()
} catch (e) {
// Swallow ws.close() errors.
}
}
reject(err)
})
})
this.connected = true
this._startKeepAlive()
}
_startKeepAlive () {
if (!this.connected) return
const ws = this.ws
ws.ping()
this._keepAliveTimeout = setTimeout(() => {
if (this.ws !== ws) {
try {
ws.close()
} catch (e) {
// Swallow ws.close() errors.
}
return
}
console.error('Netdump did not get a response to pong in 10 seconds, disconnecting.')
this._disconnect()
}, 10000)
ws.once('pong', async () => {
if (ws !== this.ws) return
clearTimeout(this._keepAliveTimeout)
this._keepAliveTimeout = null
await new Promise(resolve => setTimeout(resolve, 10000))
this._startKeepAlive()
})
}
_stopKeepAlive () {
if (this._keepAliveTimeout) {
clearTimeout(this._keepAliveTimeout)
this._keepAliveTimeout = null
}
}
/**
* Disconnect netdump connection. This is usually only required if a new
* netdump connection has been created and is no longer needed
* @example
* netdump.disconnect();
*/
disconnect () {
this.pendingConnect = false
this._disconnect()
}
_disconnect () {
this.connected = false
this.handler = null
this._stopKeepAlive()
if (this.ws) {
try {
this.ws.close()
} catch (e) {
// Swallow ws.close() errors.
}
this.ws = null
}
}
/** Start netdump
* @example
* let netdump = await instance.newNetdump();
* netdump.start();
*/
async start () {
await this.connect()
await this._fetch('/netdump/enable', { method: 'POST' })
await this.instance._waitFor(() => {
return this.instance.info.netdump && this.instance.info.netdump.enabled
})
return true
}
/** Set message handler
* @param {NetworkMonitorProcessMap~newEntryCallback} handler - the callback for captured entry
* @example
* let netdump = await instance.newNetdump();
* netdump.handleMessage((message) => {
* if (Buffer.isBuffer(message)) {
* console.log(message.toString())
* } else {
* console.log(message)
* }
* });
*/
handleMessage (handler) {
this.handler = handler
}
/** Clear captured netdump data
* @example
* let netdump = await instance.newNetdump();
* netdump.clearLog();
*/
async clearLog () {
let disconnectAfter = false
if (!this.connected) {
await this.connect()
disconnectAfter = true
}
await this.ws.send(JSON.stringify({ type: 'clear' }))
if (disconnectAfter) {
await this.disconnect()
}
}
/** Stop Netdump
* @example
* let netdump = await instance.newNetdump();
* netdump.stop();
*/
async stop () {
await this._fetch('/netdump/disable', { method: 'POST' })
await this.disconnect()
await this.instance._waitFor(() => {
return !(this.instance.info.netdump && this.instance.info.netdump.enabled)
})
return (await this.isEnabled()) === false
}
/** Check if netdump is enabled
* @returns {boolean}
* @example
* let enabled = await netdump.isEnabled();
* if (enabled) {
* console.log("enabled");
* } else {
* console.log("disabled");
* }
*/
async isEnabled () {
const info = await fetchApi(this.instance.project, `/instances/${this.instance.id}`)
return info ? (info.netdump ? info.netdump.enabled : false) : false
}
async _fetch (endpoint = '', options = {}) {
return await fetchApi(
this.instance.project,
`/instances/${this.instance.id}${endpoint}`,
options
)
}
}
module.exports = Netdump