manage sockets by nodes, supporting multiplexing

This commit is contained in:
srgooglo 2020-10-23 16:13:29 +02:00
parent c1b764be06
commit f7535ae7c0
5 changed files with 100 additions and 134 deletions

View File

@ -15,6 +15,7 @@ module.exports = {
app_settings_storage: 'app_settings',
endpoint_global: 'https://comty.pw',
endpoint_websocket: 'eu_es01.ragestudio.net',
proxy_local: 'http://localhost:8000',
session_token_storage: 'cid',

View File

@ -9,24 +9,33 @@ const stateCodes = {
2: "connecting",
3: "disconnected"
}
interface ioConnTypes {
payload: any;
listenerKey: string;
toState: number;
event: any;
attempNumber: number;
startTime: number;
}
export default class SocketConnection {
ioConn: any
state: { connAttemps: number; registeredNamespaces: any; connectionState: any; listeners: any; latency: any; namespace: any; }
props: any
opts: any
dispatcher: any;
namespaceConnector: (namespace: any) => void
then: any;
namespaceConnector: any;
constructor(props: any) {
if (!props) {
throw new Error("Mmm some props are not defined")
}
this.then = props.then
this.props = props.payload
this.dispatcher = props.connector
this.state = {
namespace: "/",
namespaceOrigin: this.props.node ?? "/",
latency: 0,
listeners: {},
connectionState: "init",
@ -46,7 +55,6 @@ export default class SocketConnection {
pingInterval: 5000,
autoConnect: true,
query: {},
// options of the Engine.IO client
upgrade: true,
forceJSONP: false,
jsonp: true,
@ -61,7 +69,6 @@ export default class SocketConnection {
onlyBinaryUpgrades: false,
requestTimeout: 0,
protocols: [],
// options for Node.js
agent: false,
pfx: null,
key: null,
@ -73,51 +80,59 @@ export default class SocketConnection {
perMessageDeflate: true,
forceNode: false,
localAddress: null,
// options for Node.js / React Native
extraHeaders: {},
extraHeaders: {}
}
if (typeof (this.props) !== "undefined") {
this.opts = { ...this.opts, ...this.props }
}
this.createConnection().then((e) => {
this.createConnection().then(() => {
this.ioConn.updateConnectionState(2)
this.setConnectionListeners()
})
this.namespaceConnector = (namespace: string) => {
this.createConnection(namespace).then(() => {
this.ioConn.updateConnectionState(2)
this.setConnectionListeners()
})
}
}
createConnection(namespace) {
const getNamespace = () => {
console.log(this.opts.hostname)
createConnection(namespace: void) {
const getNode = () => {
const defaultNode = `${this.opts.hostname}${this.state.namespaceOrigin}`
if (typeof (namespace) !== "undefined") {
return namespace
return `${this.opts.hostname}:${this.opts.port}${namespace}`
}
return this.opts.hostname
return defaultNode
}
return new Promise((resolve) => {
console.log(getNamespace())
this.ioConn = io(getNamespace(), this.opts)
this.ioConn = io(getNode(), this.opts)
this.ioConn.updateState = (payload) => {
this.ioConn.updateState = (payload: ioConnTypes) => {
this.state = { ...this.state, ...payload }
this.dispatcher({ type: "socket/updateState", payload: this.state })
const sendBackPayload = { ...this.state, ioConn: this.ioConn, namespaceConnector: this.namespaceConnector }
this.dispatcher({ type: "socket/updateStateFromSocket", node: this.state.namespaceOrigin, payload: sendBackPayload })
}
this.ioConn.updateListener = (listenerKey, toState) => {
this.ioConn.updateListener = (listenerKey: ioConnTypes, toState: ioConnTypes) => {
if (!listenerKey)
return false
const getInvert = () => {
// @ts-ignore
if (this.state.listeners[listenerKey] != null) {
// @ts-ignore
return !this.state.listeners[listenerKey]
} else {
return true // this set activated listener by default if not exist any entries
}
}
let updatedObj = []
let updatedObj: any = []
// @ts-ignore
updatedObj[listenerKey] = toState ?? getInvert()
let updatedState = this.state.listeners
@ -126,25 +141,23 @@ export default class SocketConnection {
this.ioConn.updateState({ listeners: updatedState })
}
this.ioConn.updateConnectionState = (code) => {
this.ioConn.updateConnectionState = (code: number) => {
if (code != null && typeof (code) == "number") {
// @ts-ignore
if (this.state.connectionState !== stateCodes[code]) { // avoiding update innecesary
// @ts-ignore
this.ioConn.updateState({ connectionState: stateCodes[code] })
}
}
}
this.ioConn._emit = (...context) => {
this.ioConn._emit = (...context: any) => {
const listenerKey = context[0]
if (typeof (this.state.listeners[listenerKey]) == "undefined") {
this.ioConn.updateListener(listenerKey, true)
}
if (this.state.listeners[listenerKey] != null && !this.state.listeners[listenerKey]) {
verbosity([`Listener [${listenerKey}] is broked!`], { color: "red" })
// setTimeout(() => {
// this.ioConn.updateListener(listenerKey)
// }, 1000)
return false
}
@ -156,13 +169,13 @@ export default class SocketConnection {
}
setConnectionListeners() {
this.ioConn.on('connect', (event: any) => {
notify.success("You are now online")
verbosity("Connected to socket", event)
this.ioConn.on('connect', () => {
this.ioConn.updateConnectionState(1)
verbosity(["🔌 Connected to socket"])
this.then(this.ioConn) // sending init data
})
this.ioConn.on("connect_error", (event: any) => {
this.ioConn.on("connect_error", (event: ioConnTypes) => {
if (this.state.connectionState !== "connecting") {
this.ioConn.updateConnectionState(2)
}
@ -175,35 +188,35 @@ export default class SocketConnection {
this.state.connAttemps = this.state.connAttemps + 1
})
this.ioConn.on('reconnect', (attemptNumber: number) => {
verbosity(["Connection reconected with (", attemptNumber, ") tries"])
notify.success("You are now online")
this.ioConn.on('reconnect', (attemptNumber: ioConnTypes) => {
verbosity([`[socket_event] reconnect > Connection reconected with (${attemptNumber}) tries`])
this.ioConn.updateConnectionState(1)
})
this.ioConn.on('disconnect', (event) => {
verbosity([`Disconnected from socket >`, event])
this.ioConn.on('disconnect', (event: ioConnTypes) => {
verbosity([`[socket_event] disconnect >`, event])
notify.warn("You are offline")
this.ioConn.updateConnectionState(3)
})
this.ioConn.on('connect_timeout', () => {
notify.warn("Connection timeout")
verbosity([`[socket_event] connect_timeout >`])
this.ioConn.updateConnectionState(3)
})
this.ioConn.on('error', (event: any) => {
verbosity([`New error from socket >`, event])
this.ioConn.on('error', (event: ioConnTypes) => {
verbosity([`[socket_event] error >`, event])
})
this.ioConn.on('updateState', (event: any) => {
this.ioConn.on('updateState', (event: ioConnTypes) => {
this.ioConn.updateState(event)
})
if (typeof (this.ioConn.io.opts.pingInterval) !== "undefined") {
if (typeof (this.ioConn.io.opts.pingInterval) == "number") {
setInterval(() => {
this.ioConn.emit('latency', Date.now(), (startTime) => {
this.ioConn.emit('latency', Date.now(), (startTime: ioConnTypes) => {
// @ts-ignore
const latency = Date.now() - startTime
this.ioConn.updateState({ latency })
})
@ -211,5 +224,4 @@ export default class SocketConnection {
}
}
}
}

View File

@ -17,7 +17,7 @@ export default class SocketDebug extends React.Component {
dispatchSocket(value) {
this.props.dispatch({
type: `socket/initializeSocket`,
type: `socket/createNodeSocket`,
payload: {
address: value
}

View File

@ -56,7 +56,7 @@ export default {
})
dispatch({ type: 'updateFrames' })
dispatch({ type: 'handleValidate' })
dispatch({ type: 'socket/initializeSocket' })
dispatch({ type: 'socket/createNodeSocket' })
dispatch({ type: 'queryAuth' })
dispatch({ type: 'query', payload: { dispatcher: dispatch } })
},
@ -127,7 +127,7 @@ export default {
const socket = yield select(state => state.socket)
const state = yield select(state => state)
},
*logout({ payload }, { call, put, select }) {

View File

@ -16,25 +16,12 @@ import cookie from 'cookie_js'
export default {
namespace: 'socket',
state: {
resolvers: null,
socket_address: "85.251.59.39", //set by default
nodes: {},
socket_address: app_config.endpoint_websocket, //set by default
socket_port: "7000",
ioConn: null,
listeners: {}
},
subscriptions: {
setup({ dispatch }) {
dispatch({ type: 'query' })
},
},
effects: {
*query({ payload }, { call, put, select }) {
const state = yield select(state => state)
yield put({ type: "updateState", payload: { resolvers: state.app.resolvers } })
},
*initializeSocket({ payload, then }, { select, put }) {
*createNodeSocket({ payload, then }, { select, put }) {
const state = yield select(state => state)
let opt = {
hostname: `${state.socket.socket_address}:${state.socket.socket_port}`, // set stated data
@ -46,84 +33,50 @@ export default {
opt = { ...opt, ...payload }
}
const handleThen = () => {
if (typeof (then) !== "undefined") {
return then(true)
}
try {
new SocketConnection({
payload: opt, connector: state.app.dispatcher, then: (data) => {
if (typeof (then) !== "undefined") {
return then(true)
}
}
})
} catch (error) {
verbosity([error])
}
appInterface.notify.proccess("Connecting to server")
yield put({
type: "handleSocket",
payload: new SocketConnection({ payload: opt, connector: state.app.dispatcher, then: handleThen })
})
},
*namespaceConnector({ namespace, then }, { select, put }) {
const state = yield select(state => state.socket)
const hostname = `${state.socket_address}:${state.socket_port}/${namespace}`
state.ioConn.disconnect()
yield put({ type: "initializeSocket", payload: { hostname } })
},
*break({ listener }, { select, put }) {
const state = yield select(state => state.socket)
state.ioConn.updateListener(listener, false)
},
*resume({ listener }, { select, put }) {
const state = yield select(state => state.socket)
state.ioConn.updateListener(listener, true)
},
*toogleListener({ listener }, { select, put }) {
const state = yield select(state => state.socket)
state.ioConn.updateListener(listener)
},
*getLatency({ payload }, { select, put }) {
const state = yield select(state => state.socket)
state.ioConn.emit('latency', Date.now(), (startTime) => {
const latency = Date.now() - startTime
verbosity(latency)
})
},
*floodTest({ ticks, offset }, { call, put, select }) {
const state = yield select(state => state)
if (ticks == null) {
ticks = 300
*namespaceConnector({ namespace, node }, { select, put }) {
if (!node || !namespace) {
verbosity(`cannot connect to a namespace without declaring the namespace/node`)
return false
}
const tickSound = new Howl({
preload: true,
html5: true,
src: ["https://dl.ragestudio.net/tick.wav"]
})
const endSound = new Howl({
preload: true,
html5: true,
src: ["https://dl.ragestudio.net/tickUp.wav"]
})
state.socket.ioConn._emit("floodTest", offset ?? Number(0)) // start flood
state.socket.ioConn.on('floodTest', (e: any) => {
const n = e + 1
const canTick = n < (ticks + 1)
verbosity([`floodTest (recived)=> ${e} | sending => ${n}`])
if (canTick) {
setTimeout(() => {
state.socket.ioConn._emit("floodTest", n)
tickSound.play()
}, n)
} else {
endSound.play()
}
})
const state = yield select(state => state.socket)
state.nodes[node].namespaceConnector(`/${namespace}`)
},
*break({ listener, node }, { select, put }) {
if (!node || !listener) {
verbosity(`cannot change a listener without declaring the node/listener`)
return false
}
const state = yield select(state => state.socket)
state.nodes[node].ioConn.updateListener(listener, false)
},
*resume({ listener, node }, { select, put }) {
if (!node || !listener) {
verbosity(`cannot change a listener without declaring the node/listener`)
return false
}
const state = yield select(state => state.socket)
state.nodes[node].ioConn.updateListener(listener, true)
},
*toogleListener({ listener, node }, { select, put }) {
if (!node || !listener) {
verbosity(`cannot change a listener without declaring the node/listener`)
return false
}
const state = yield select(state => state.socket)
state.nodes[node].ioConn.updateListener(listener)
}
},
reducers: {
updateState(state, { payload }) {
@ -132,8 +85,8 @@ export default {
...payload,
};
},
handleSocket(state, { payload }) {
state.ioConn = payload.ioConn
updateStateFromSocket(state, { payload, node }) {
state.nodes[node] = payload
},
},
};