adapted to linebridge 0.16

This commit is contained in:
SrGooglo 2023-11-28 18:59:18 +00:00
parent baabea0f4e
commit 8db47caf69
27 changed files with 419 additions and 1108 deletions

View File

@ -1,7 +0,0 @@
{
"workspaceId": "64519574a8a691b55e8b361d",
"defaultEnvironment": "dev",
"gitBranchToEnvironmentMapping": {
"master": "prod"
}
}

View File

@ -3,11 +3,12 @@
"version": "0.58.2",
"main": "dist/index.js",
"scripts": {
"build": "corenode-cli build",
"dev": "cross-env NODE_ENV=development nodemon --ignore dist/ --exec corenode-node ./src/index.js",
"build": "hermes build",
"dev": "cross-env NODE_ENV=development nodemon --ignore dist/ --exec hermes-node ./src/index.js",
"run:prod": "cross-env NODE_ENV=production node ./dist/index.js"
},
"shared": {
"classes/RTEngineServer": "src/shared-classes/RTEngineServer",
"classes/FileUpload": "src/shared-classes/FileUpload",
"classes/CacheService": "src/shared-classes/CacheService",
"classes/ComtyClient": "src/shared-classes/ComtyClient",
@ -19,51 +20,34 @@
},
"license": "MIT",
"dependencies": {
"@corenode/utils": "0.28.26",
"@foxify/events": "^2.1.0",
"@jimp/plugin-scale": "^0.22.7",
"@octokit/rest": "^19.0.7",
"@socket.io/cluster-adapter": "^0.2.2",
"@socket.io/redis-adapter": "^8.2.1",
"@socket.io/redis-emitter": "^5.1.0",
"@socket.io/sticky": "^1.0.3",
"@tensorflow/tfjs-node": "4.0.0",
"axios": "^1.2.5",
"bcrypt": "^5.1.0",
"busboy": "^1.6.0",
"comty.js": "^0.58.2",
"connect-mongo": "^4.6.0",
"content-range": "^2.0.2",
"corenode": "0.28.26",
"dicebar_lib": "1.0.1",
"dotenv": "^16.0.3",
"fluent-ffmpeg": "^2.1.2",
"formidable": "^2.1.1",
"infisical-node": "^1.2.1",
"jimp": "^0.16.2",
"jsonwebtoken": "^9.0.0",
"linebridge": "0.15.13",
"linebridge": "0.16.0",
"luxon": "^3.2.1",
"merge-files": "^0.1.2",
"mime-types": "^2.1.35",
"minio": "^7.0.32",
"moment": "^2.29.4",
"moment-timezone": "^0.5.40",
"mongoose": "^6.9.0",
"music-metadata": "^8.1.3",
"normalize-url": "^8.0.0",
"nsfwjs": "2.4.2",
"p-map": "4",
"p-queue": "^7.3.4",
"passport": "^0.6.0",
"passport-jwt": "^4.0.1",
"passport-local": "^1.0.0",
"path-to-regexp": "^6.2.1",
"peer": "^1.0.0",
"sharp": "^0.31.3",
"split-chunk-merge": "^1.0.0"
"path-to-regexp": "^6.2.1"
},
"devDependencies": {
"@ragestudio/hermes": "^0.1.0",
"@corenode/utils": "0.28.26",
"chai": "^4.3.7",
"cross-env": "^7.0.3",
"mocha": "^10.2.0",

View File

@ -1,314 +0,0 @@
import path from "path"
import fs from "fs"
import { Server } from "linebridge/dist/server"
import express from "express"
import bcrypt from "bcrypt"
import passport from "passport"
import jwt from "jsonwebtoken"
import EventEmitter from "@foxify/events"
import { User, Session, Config } from "@shared-classes/DbModels"
import DbManager from "@shared-classes/DbManager"
import RedisClient from "@shared-classes/RedisClient"
import StorageClient from "@shared-classes/StorageClient"
import internalEvents from "./events"
const ExtractJwt = require("passport-jwt").ExtractJwt
const LocalStrategy = require("passport-local").Strategy
global.signLocation = process.env.signLocation
export default class API {
server = global.server = new Server({
name: "Main-API",
minimal: true,
listen_port: process.env.MAIN_LISTEN_PORT ?? 3000,
onWSClientConnection: (...args) => {
this.onWSClientConnection(...args)
},
onWSClientDisconnect: (...args) => {
this.onWSClientDisconnect(...args)
},
},
require("@controllers"),
require("@middlewares"),
{
"Access-Control-Allow-Methods": "GET, POST, PUT, DELETE, OPTIONS",
"Access-Control-Allow-Headers": "Content-Type, Authorization, Content-Length, X-Requested-With, X-Access-Token, X-Refresh-Token, server_token",
"Access-Control-Expose-Headers": "regenerated_token",
},
)
redis = global.redis = RedisClient({
withWsAdapter: true
})
DB = new DbManager()
eventBus = global.eventBus = new EventEmitter()
storage = global.storage = StorageClient()
jwtStrategy = global.jwtStrategy = {
jwtFromRequest: ExtractJwt.fromAuthHeaderAsBearerToken(),
secretOrKey: process.env.SERVER_TOKEN ?? "secret",
algorithms: ["sha1", "RS256", "HS256"],
expiresIn: process.env.signLifetime ?? "1h",
enforceRegenerationTokenExpiration: false,
}
constructor() {
this.server.engine_instance.use(express.json())
this.server.engine_instance.use(express.urlencoded({ extended: true }))
this.server.websocket_instance["clients"] = []
this.server.websocket_instance["findUserIdFromClientID"] = (searchClientId) => {
return this.server.websocket_instance.clients.find(client => client.id === searchClientId)?.userId ?? false
}
this.server.websocket_instance["getClientSockets"] = (userId) => {
return this.server.websocket_instance.clients.filter(client => client.userId === userId).map((client) => {
return client?.socket
})
}
this.server.websocket_instance["broadcast"] = async (channel, ...args) => {
for await (const client of this.server.websocket_instance.clients) {
client.socket.emit(channel, ...args)
}
}
global.websocket_instance = this.server.websocket_instance
global.uploadCachePath = process.env.uploadCachePath ?? path.resolve(process.cwd(), "cache")
if (!fs.existsSync(global.uploadCachePath)) {
fs.mkdirSync(global.uploadCachePath, {
recursive: true,
})
}
global.DEFAULT_POSTING_POLICY = {
maxMessageLength: 512,
acceptedMimeTypes: [
"application/octet-stream",
"image/jpg",
"image/jpeg",
"image/png",
"image/gif",
"audio/mp3",
"audio/mpeg",
"audio/ogg",
"audio/wav",
"audio/flac",
"video/mp4",
"video/mkv",
"video/webm",
"video/quicktime",
"video/x-msvideo",
"video/x-ms-wmv",
],
maximumFileSize: 80 * 1024 * 1024,
maximunFilesPerRequest: 20,
}
// register internal events
for (const [eventName, eventHandler] of Object.entries(internalEvents)) {
this.eventBus.on(eventName, eventHandler)
}
}
events = internalEvents
async initialize() {
await this.redis.initialize()
await this.DB.initialize()
await this.initializeConfigDB()
await this.storage.initialize()
await this.checkSetup()
await this.initPassport()
await this.initWebsockets()
await this.server.initialize()
}
initializeConfigDB = async () => {
let serverConfig = await Config.findOne({ key: "server" }).catch(() => {
return false
})
if (!serverConfig) {
serverConfig = new Config({
key: "server",
value: {
setup: false,
},
})
await serverConfig.save()
}
}
checkSetup = async () => {
return new Promise(async (resolve, reject) => {
let setupOk = (await Config.findOne({ key: "server" })).value?.setup ?? false
if (!setupOk) {
console.log("⚠️ Server setup is not complete, running setup proccess.")
let setupScript = await import("./setup")
setupScript = setupScript.default ?? setupScript
try {
for await (let script of setupScript) {
await script()
}
console.log("✅ Server setup complete.")
await Config.updateOne({ key: "server" }, { value: { setup: true } })
return resolve()
} catch (error) {
console.log("❌ Server setup failed.")
console.error(error)
process.exit(1)
}
}
return resolve()
})
}
initPassport() {
this.server.middlewares["useJwtStrategy"] = (req, res, next) => {
req.jwtStrategy = this.jwtStrategy
next()
}
passport.use(new LocalStrategy({
usernameField: "username",
passwordField: "password",
session: false
}, (username, password, done) => {
// check if username is a email with regex
let isEmail = username.match(/^[^\s@]+@[^\s@]+\.[^\s@]+$/)
let query = isEmail ? { email: username } : { username: username }
User.findOne(query).select("+password")
.then((data) => {
if (data === null) {
return done(null, false, this.jwtStrategy)
} else if (!bcrypt.compareSync(password, data.password)) {
return done(null, false, this.jwtStrategy)
}
// create a token
return done(null, data, this.jwtStrategy, { username, password })
})
.catch(err => done(err, null, this.jwtStrategy))
}))
this.server.engine_instance.use(passport.initialize())
}
initWebsockets() {
const onAuthenticated = async (socket, userData) => {
await this.attachClientSocket(socket, userData)
return socket.emit("authenticated")
}
const onAuthenticatedFailed = async (socket, error) => {
await this.detachClientSocket(socket)
return socket.emit("authenticateFailed", {
error,
})
}
if (this.redis.ioAdapter) {
this.server.websocket_instance.io.adapter(this.redis.ioAdapter)
}
this.server.websocket_instance.eventsChannels.push(["/main", "ping", async (socket) => {
return socket.emit("pong")
}])
this.server.websocket_instance.eventsChannels.push(["/main", "authenticate", async (socket, authPayload) => {
if (!authPayload) {
return onAuthenticatedFailed(socket, "missing_auth_payload")
}
const session = await Session.findOne({ token: authPayload.token }).catch((err) => {
return false
})
if (!session) {
return onAuthenticatedFailed(socket, "Session not found")
}
await jwt.verify(authPayload.token, this.jwtStrategy.secretOrKey, async (err, decoded) => {
if (err) {
return onAuthenticatedFailed(socket, err)
}
const userData = await User.findById(decoded.user_id).catch((err) => {
return false
})
if (!userData) {
return onAuthenticatedFailed(socket, "User not found")
}
return onAuthenticated(socket, userData)
})
}])
}
onWSClientConnection = async (socket) => {
console.log(`🌐 Client connected: ${socket.id}`)
}
onWSClientDisconnect = async (socket) => {
console.log(`🌐 Client disconnected: ${socket.id}`)
this.detachClientSocket(socket)
}
attachClientSocket = async (socket, userData) => {
const client = this.server.websocket_instance.clients.find(c => c.id === socket.id)
if (client) {
client.socket.disconnect()
}
const clientObj = {
id: socket.id,
socket: socket,
user_id: userData._id.toString(),
}
this.server.websocket_instance.clients.push(clientObj)
console.log(`📣 Client [${socket.id}] authenticated as ${userData.username}`)
this.eventBus.emit("user.connected", clientObj.user_id)
}
detachClientSocket = async (socket) => {
const client = this.server.websocket_instance.clients.find(c => c.id === socket.id)
if (client) {
this.server.websocket_instance.clients = this.server.websocket_instance.clients.filter(c => c.id !== socket.id)
console.log(`📣🔴 Client [${socket.id}] authenticated as ${client.user_id} disconnected`)
this.eventBus.emit("user.disconnected", client.user_id)
}
}
}

View File

@ -23,10 +23,10 @@ export default {
await user.save()
global.websocket_instance.io.emit(`user.update`, {
global.engine.ws.io.of("/").emit(`user.update`, {
...user.toObject(),
})
global.websocket_instance.io.emit(`user.update.${targetUserId}`, {
global.engine.ws.io.of("/").emit(`user.update.${targetUserId}`, {
...user.toObject(),
})

View File

@ -1,32 +1,39 @@
import passport from "passport"
import { Token } from "@lib"
import Token from "@lib/token"
import { User } from "@shared-classes/DbModels"
import bcrypt from "bcrypt"
export default {
method: "POST",
route: "/login",
fn: async (req, res) => {
passport.authenticate("local", { session: false }, async (error, user, options) => {
if (error) {
return res.status(500).json({
message: `Error validating user > ${error.message}`,
})
}
const { username, password } = req.body
if (!user) {
return res.status(401).json({
message: "Invalid credentials",
})
}
let isEmail = username.match(/^[^\s@]+@[^\s@]+\.[^\s@]+$/)
const token = await Token.createNewAuthToken({
username: user.username,
user_id: user._id.toString(),
ip_address: req.headers["x-forwarded-for"]?.split(",")[0] ?? req.socket.remoteAddress,
client: req.headers["user-agent"],
signLocation: global.signLocation,
}, options)
let query = isEmail ? { email: username } : { username: username }
return res.json({ token: token })
})(req, res)
const user = await User.findOne(query).select("+password")
if (!user) {
return res.status(401).json({
message: "Invalid credentials, user not found",
})
}
if (!bcrypt.compareSync(password, user.password)) {
return res.status(401).json({
message: "Invalid credentials",
})
}
const token = await Token.createAuth({
username: user.username,
user_id: user._id.toString(),
ip_address: req.headers["x-forwarded-for"]?.split(",")[0] ?? req.socket.remoteAddress,
client: req.headers["user-agent"],
signLocation: global.signLocation,
})
return res.json({ token: token })
}
}

View File

@ -13,7 +13,7 @@ export default {
const { message } = req.selection
try {
const comment = newComment({
const comment = await newComment({
user_id: req.user._id.toString(),
parent_id: post_id,
message: message,

View File

@ -26,8 +26,7 @@ export default async (payload) => {
await comment.delete()
global.websocket_instance.io.emit(`comment.delete.${comment_id}`)
global.websocket_instance.io.emit(`post.delete.comment.${comment.parent_id.toString()}`, comment_id)
global.engine.ws.io.of("/").emit(`post.delete.comment.${comment.parent_id.toString()}`, comment_id)
return comment
}

View File

@ -25,12 +25,7 @@ export default async (payload) => {
const userData = await User.findById(user_id)
global.websocket_instance.io.emit(`comment.new.${parent_id}`, {
...comment.toObject(),
user: userData.toObject(),
})
global.websocket_instance.io.emit(`post.new.comment.${parent_id}`, {
global.engine.ws.io.of("/").emit(`post.new.comment.${parent_id}`, {
...comment.toObject(),
user: userData.toObject(),
})

View File

@ -1,268 +0,0 @@
import fs from "fs"
import { Controller } from "linebridge/dist/server"
import ChunkedUpload from "@lib/chunkedUpload"
import uploadBodyFiles from "./services/uploadBodyFiles"
import { videoTranscode } from "@lib/videoTranscode"
import Jimp from "jimp"
const maximuns = {
imageResolution: {
width: 3840,
height: 2160,
},
imageQuality: 80,
}
async function processVideo(file, params = {}) {
const result = await videoTranscode(file.filepath, global.uploadCachePath, {
videoCodec: "libx264",
format: "mp4",
...params
})
file.filepath = result.filepath
file.filename = result.filename
return file
}
async function processImage(file) {
const { width, height } = await new Promise((resolve, reject) => {
Jimp.read(file.filepath)
.then((image) => {
resolve({
width: image.bitmap.width,
height: image.bitmap.height,
})
})
.catch((err) => {
reject(err)
})
})
if (width > maximuns.imageResolution.width || height > maximuns.imageResolution.height) {
await new Promise((resolve, reject) => {
// calculate max resolution respecting aspect ratio
const resizedResolution = {
width: maximuns.imageResolution.width,
height: maximuns.imageResolution.height,
}
if (width > height) {
resizedResolution.height = Math.floor((height / width) * maximuns.imageResolution.width)
}
if (height > width) {
resizedResolution.width = Math.floor((width / height) * maximuns.imageResolution.height)
}
Jimp.read(file.filepath)
.then((image) => {
image
.resize(resizedResolution.width, resizedResolution.height)
.quality(maximuns.imageQuality)
.write(file.filepath, resolve)
})
.catch((err) => {
reject(err)
})
})
}
return file
}
export default class FilesController extends Controller {
static refName = "FilesController"
static useRoute = "/files"
chunkUploadEngine = new ChunkedUpload({
tmpPath: global.uploadCachePath,
outputPath: global.uploadCachePath,
maxFileSize: global.DEFAULT_POSTING_POLICY.maximumFileSize,
acceptedMimeTypes: global.DEFAULT_POSTING_POLICY.acceptedMimeTypes,
onExceedMaxFileSize: (req) => {
// check if user has permission to upload big files
if (!req.user) {
return false
}
return req.user.roles.includes("admin") || req.user.roles.includes("moderator") || req.user.roles.includes("developer")
}
})
fileTransformer = {
"video/avi": processVideo,
"video/quicktime": processVideo,
"video/mp4": processVideo,
"video/webm": processVideo,
"image/jpeg": processImage,
"image/png": processImage,
"image/gif": processImage,
"image/bmp": processImage,
"image/tiff": processImage,
"image/webp": processImage,
"image/jfif": processImage,
}
httpEndpoints = {
get: {
"/objects": {
middlewares: ["withAuthentication"],
fn: async (req, res) => {
const user_id = req.user.id
let totalSize = 0
const objectsPath = `${user_id}/`
const objects = await new Promise((resolve, reject) => {
const objects = []
const objectsStream = global.storage.listObjects(global.storage.defaultBucket, objectsPath, true)
objectsStream.on("data", (obj) => {
objects.push(obj)
})
objectsStream.on("error", (err) => {
return reject(err)
})
objectsStream.on("end", () => {
return resolve(objects)
})
})
for await (const object of objects) {
totalSize += object.size
}
return res.json({
totalSize,
objects,
})
}
}
},
post: {
"/upload_chunk": {
middlewares: ["withAuthentication", this.chunkUploadEngine.makeMiddleware()],
fn: async (req, res) => {
if (!req.isLastPart) {
return res.json({
status: "ok",
filePart: req.filePart,
lastPart: req.isLastPart,
})
}
if (!req.fileResult) {
return res.status(500).json({
error: "File upload failed",
})
}
console.log(req.fileResult)
try {
// check if mimetype has transformer
if (typeof this.fileTransformer[req.fileResult.mimetype] === "function") {
req.fileResult = await this.fileTransformer[req.fileResult.mimetype](req.fileResult)
}
} catch (error) {
console.log(error)
return res.status(500).json({
error: "File upload failed on transformation",
reason: error.message,
})
}
// start upload to s3
const remoteUploadPath = req.user?._id ? `${req.user?._id.toString()}/${req.fileResult.filename}` : file.filename
const remoteUploadResponse = await new Promise((_resolve, _reject) => {
try {
const fileStream = fs.createReadStream(req.fileResult.filepath)
fs.stat(req.fileResult.filepath, (err, stats) => {
try {
if (err) {
return _reject(new Error(`Failed to upload file to storage server > ${err.message}`))
}
global.storage.putObject(global.storage.defaultBucket, remoteUploadPath, fileStream, stats.size, req.fileResult, (err, etag) => {
if (err) {
return _reject(new Error(`Failed to upload file to storage server > ${err.message}`))
}
return _resolve({
etag,
})
})
} catch (error) {
return _reject(new Error(`Failed to upload file to storage server > ${error.message}`))
}
})
} catch (error) {
return _reject(new Error(`Failed to upload file to storage server > ${error.message}`))
}
}).catch((err) => {
res.status(500).json({
error: err.message,
})
return false
})
if (!remoteUploadResponse) {
return false
}
try {
// remove file from cache
await fs.promises.unlink(req.fileResult.filepath)
// programatically remove file from cache in the
} catch (error) {
console.log("Failed to remove file from cache", error)
return res.status(500).json({
error: error.message,
})
}
// get url location
const remoteUrlObj = global.storage.composeRemoteURL(remoteUploadPath)
return res.json({
name: req.fileResult.filename,
id: remoteUploadPath,
url: remoteUrlObj,
})
}
},
"/upload": {
middlewares: ["withAuthentication"],
fn: async (req, res) => {
const results = await uploadBodyFiles({
req,
}).catch((err) => {
res.status(400).json({
error: err.message,
})
return false
})
if (results) {
return res.json(results)
}
}
}
}
}
}

View File

@ -1,247 +0,0 @@
import path from "path"
import fs from "fs"
import { videoTranscode } from "@lib/videoTranscode"
import Jimp from "jimp"
import mime from "mime-types"
import pmap from "@utils/pMap"
const formidable = require("formidable")
const maximuns = {
imageResolution: {
width: 3840,
height: 2160,
},
imageQuality: 80,
}
const handleUploadVideo = async (file, params) => {
const transcoded = await videoTranscode(file.filepath, params.cacheUploadDir)
file.filepath = transcoded.filepath
file.newFilename = path.basename(file.filepath)
return file
}
const handleImage = async (file) => {
const { width, height } = await new Promise((resolve, reject) => {
Jimp.read(file.filepath)
.then((image) => {
resolve({
width: image.bitmap.width,
height: image.bitmap.height,
})
})
.catch((err) => {
reject(err)
})
})
if (width > maximuns.imageResolution.width || height > maximuns.imageResolution.height) {
await new Promise((resolve, reject) => {
Jimp.read(file.filepath)
.then((image) => {
image
.resize(maximuns.imageResolution.width, maximuns.imageResolution.height)
.quality(maximuns.imageQuality)
.write(file.filepath, resolve)
})
.catch((err) => {
reject(err)
})
})
}
file.newFilename = path.basename(file.filepath)
return file
}
export default async (payload) => {
if (!payload) {
throw new Error("Missing payload")
}
const { req } = payload
let params = {
cacheUploadDir: global.uploadCachePath,
maxFileSize: global.DEFAULT_POSTING_POLICY.maximumFileSize,
maxFields: global.DEFAULT_POSTING_POLICY.maximunFilesPerRequest,
acceptedMimeTypes: global.DEFAULT_POSTING_POLICY.acceptedMimeTypes,
}
if (payload.params) {
params = {
...params,
...payload.params,
}
}
const processedFiles = []
const failedFiles = []
let queuePromieses = []
// check directories exist
if (!fs.existsSync(params.cacheUploadDir)) {
await fs.promises.mkdir(params.cacheUploadDir, { recursive: true })
}
// decode body form-data
const form = formidable({
multiples: true,
keepExtensions: true,
uploadDir: params.cacheUploadDir,
maxFileSize: params.maxFileSize,
maxFields: params.maxFields,
filename: (name, ext, part, form) => {
if (!ext) {
ext = `.${mime.extension(part.mimetype)}`
}
name = global.nanoid()
return name + ext
},
filter: (stream) => {
// check if is allowed mime type
if (!params.acceptedMimeTypes.includes(stream.mimetype)) {
failedFiles.push({
fileName: stream.originalFilename,
mimetype: stream.mimetype,
error: "File type not allowed",
})
return false
}
return true
}
})
const results = await new Promise((resolve, reject) => {
// create a new thread for each file
form.parse(req, async (err, fields, data) => {
if (err) {
return reject(err)
}
if (!Array.isArray(data.files)) {
data.files = [data.files]
}
for (let file of data.files) {
if (!file) continue
// create process queue
queuePromieses.push(async () => {
// check if is video need to transcode
switch (file.mimetype) {
case "video/quicktime": {
file = await handleUploadVideo(file, params)
break
}
case "image/jpeg": {
file = await handleImage(file, params)
break
}
case "image/png": {
file = await handleImage(file, params)
break
}
case "image/gif": {
file = await handleImage(file, params)
break
}
case "image/bmp": {
file = await handleImage(file, params)
break
}
case "image/tiff": {
file = await handleImage(file, params)
break
}
case "image/webp": {
file = await handleImage(file, params)
break
}
case "image/jfif": {
file = await handleImage(file, params)
break
}
default: {
// do nothing
}
}
const metadata = {
mimetype: file.mimetype,
size: file.size,
filepath: file.filepath,
filename: file.newFilename,
}
// upload path must be user_id + file.newFilename
const uploadPath = req.user?._id ? `${req.user?._id.toString()}/${file.newFilename}` : file.newFilename
// upload to s3
await new Promise((_resolve, _reject) => {
global.storage.fPutObject(global.storage.defaultBucket, uploadPath, file.filepath, metadata, (err, etag) => {
if (err) {
return _reject(new Error(`Failed to upload file to storage server > ${err.message}`))
}
return _resolve()
})
}).catch((err) => {
return reject(err)
})
// get url location
const remoteUrlObj = global.storage.composeRemoteURL(uploadPath)
// push final filepath to urls
return {
name: file.originalFilename,
id: uploadPath,
url: remoteUrlObj,
}
})
}
// wait for all files to be processed
await pmap(
queuePromieses,
async (fn) => {
const result = await fn().catch((err) => {
console.error(err)
// FIXME: add fileNames
failedFiles.push({
error: err.message,
})
return null
})
if (result) {
processedFiles.push(result)
}
},
{ concurrency: 10 }
)
return resolve({
files: processedFiles,
failed: failedFiles,
})
})
})
return results
}

View File

@ -30,10 +30,10 @@ export default async (payload) => {
await newFollow.save()
global.websocket_instance.io.emit(`user.follow`, {
global.engine.ws.io.of("/").emit(`user.follow`, {
...user.toObject(),
})
global.websocket_instance.io.emit(`user.follow.${payload.user_id}`, {
global.engine.ws.io.of("/").emit(`user.follow.${payload.user_id}`, {
...user.toObject(),
})

View File

@ -25,10 +25,10 @@ export default async (payload) => {
await follow.remove()
global.websocket_instance.io.emit(`user.unfollow`, {
global.engine.ws.io.of("/").emit(`user.unfollow`, {
...user.toObject(),
})
global.websocket_instance.io.emit(`user.unfollow.${payload.user_id}`, {
global.engine.ws.io.of("/").emit(`user.unfollow.${payload.user_id}`, {
...user.toObject(),
})

View File

@ -36,8 +36,8 @@ export default async (payload) => {
const resultPost = await getPostData({ post_id: post._id.toString() })
global.websocket_instance.io.emit(`post.new`, resultPost)
global.websocket_instance.io.emit(`post.new.${post.user_id}`, resultPost)
global.engine.ws.io.of("/").emit(`post.new`, resultPost)
global.engine.ws.io.of("/").emit(`post.new.${post.user_id}`, resultPost)
// push to background job to check if is NSFW
flagNsfwByAttachments(post._id.toString())

View File

@ -27,7 +27,7 @@ export default async (payload) => {
}
await post.remove()
global.websocket_instance.io.emit(`post.delete`, post_id)
global.engine.ws.io.of("/").emit(`post.delete`, post_id)
return post.toObject()
}

View File

@ -24,8 +24,8 @@ export default async (post_id, modification) => {
}
}
global.websocket_instance.io.emit(`post.dataUpdate`, post)
global.websocket_instance.io.emit(`post.dataUpdate.${post_id}`, post)
global.engine.ws.io.of("/").emit(`post.dataUpdate`, post)
global.engine.ws.io.of("/").emit(`post.dataUpdate.${post_id}`, post)
return post
}

View File

@ -27,7 +27,7 @@ export default async (payload) => {
await PostLike.findByIdAndDelete(likeObj._id)
}
global.websocket_instance.io.emit(`post.${post_id}.likes.update`, {
global.engine.ws.io.of("/").emit(`post.${post_id}.likes.update`, {
to,
post_id,
user_id,

View File

@ -1,13 +1,12 @@
import { Token } from "@lib"
import Token from "@lib/token"
export default {
method: "POST",
route: "/regenerate",
middlewares: ["useJwtStrategy"],
fn: async (req, res) => {
const { expiredToken, refreshToken } = req.body
const token = await Token.regenerateSession(expiredToken, refreshToken).catch((error) => {
const token = await Token.regenerate(expiredToken, refreshToken).catch((error) => {
res.status(400).json({ error: error.message })
return null

View File

@ -1,6 +1,4 @@
import jwt from "jsonwebtoken"
import { Session } from "@shared-classes/DbModels"
import Token from "@lib/token"
export default {
method: "POST",
@ -8,44 +6,7 @@ export default {
fn: async (req, res) => {
const token = req.body.token
let result = {
expired: false,
valid: true,
session: null
}
await jwt.verify(token, global.jwtStrategy.secretOrKey, async (err, decoded) => {
if (err) {
result.valid = false
result.error = err.message
if (err.message === "jwt expired") {
result.expired = true
}
return
}
result = { ...result, ...decoded }
const sessions = await Session.find({ user_id: result.user_id })
const sessionsTokens = sessions.map((session) => {
if (session.user_id === result.user_id) {
return session.token
}
})
if (!sessionsTokens.includes(token)) {
result.valid = false
result.error = "Session token not found"
} else {
result.valid = true
}
})
if (result.valid) {
result.session = await jwt.decode(token)
}
const result = await Token.validate(token)
return res.json(result)
},

View File

@ -1,25 +1,28 @@
import { UserFollow } from "@shared-classes/DbModels"
export default async (payload = {}) => {
const { from_user_id } = payload
const { from_user_id, limit = 10, offset = 0 } = payload
// TODO: Sort by latest history interaction
// get all the users that are following
const following = await UserFollow.find({
let followingUsersIds = await UserFollow.find({
user_id: from_user_id,
})
// .skip(offset)
// .limit(limit)
// check if following users are connected
const connectedUsers = []
following.forEach((follow) => {
const connectedClient = global.websocket_instance.clients.find((client) => {
return client.user_id === follow.to
})
if (connectedClient) {
connectedUsers.push(connectedClient.user_id)
}
followingUsersIds = followingUsersIds.map((follow) => {
return follow.to
})
return connectedUsers
const searchResult = await global.engine.ws.find.manyById(followingUsersIds)
// TODO: Calculate last session duration or last activity at
return searchResult.map((user) => {
return {
_id: user.user_id,
username: user.username,
}
})
}

View File

@ -46,9 +46,9 @@ export default {
})
}
global.websocket_instance.io.emit(`streaming.new`, streamingProfile)
global.engine.ws.io.of("/").emit(`streaming.new`, streamingProfile)
global.websocket_instance.io.emit(`streaming.new.${streamingProfile.user_id}`, streamingProfile)
global.engine.ws.io.of("/").emit(`streaming.new.${streamingProfile.user_id}`, streamingProfile)
return res.json({
code: 0,

View File

@ -11,9 +11,9 @@ export default {
})
if (streamingProfile) {
global.websocket_instance.io.emit(`streaming.end`, streamingProfile)
global.engine.ws.io.of("/").emit(`streaming.end`, streamingProfile)
global.websocket_instance.io.emit(`streaming.end.${streamingProfile.user_id}`, streamingProfile)
global.engine.ws.io.of("/").emit(`streaming.end.${streamingProfile.user_id}`, streamingProfile)
return res.json({
code: 0,

View File

@ -22,10 +22,10 @@ export default async (payload) => {
await user.save()
global.websocket_instance.io.emit(`user.update`, {
global.engine.ws.io.of("/").emit(`user.update`, {
...user.toObject(),
})
global.websocket_instance.io.emit(`user.update.${payload.user_id}`, {
global.engine.ws.io.of("/").emit(`user.update.${payload.user_id}`, {
...user.toObject(),
})

View File

@ -16,7 +16,6 @@ export { default as StreamingController } from "./StreamingController"
export { default as BadgesController } from "./BadgesController"
export { default as FeaturedEventsController } from "./FeaturedEventsController" // Needs to migrate to lb 0.15
export { default as FilesController } from "./FilesController" // Needs to migrate to lb 0.15
export { default as RolesController } from "./RolesController" // Needs to migrate to lb 0.15
export { default as SearchController } from "./SearchController" // Needs to migrate to lb 0.15

View File

@ -1,3 +1,153 @@
import path from "path"
import Boot from "linebridge/bootstrap"
import { Server } from "linebridge/dist/server"
require(path.resolve(process.cwd(), "../../shared/lib/api_wrapper"))
import EventEmitter from "@foxify/events"
import { Config, User } from "@shared-classes/DbModels"
import DbManager from "@shared-classes/DbManager"
import RedisClient from "@shared-classes/RedisClient"
import StorageClient from "@shared-classes/StorageClient"
import Token from "@lib/token"
import internalEvents from "./events"
export default class API extends Server {
static refName = "MAIN-API"
static listen_port = 3010
static requireWSAuth = true
constructor(params) {
super(params)
global.DEFAULT_POSTING_POLICY = {
maxMessageLength: 512,
maximumFileSize: 80 * 1024 * 1024,
maximunFilesPerRequest: 20,
}
global.jwtStrategy = {
secretOrKey: process.env.JWT_SECRET,
expiresIn: "1h",
algorithm: "HS256",
enforceRegenerationTokenExpiration: false,
}
}
middlewares = require("@middlewares")
controllers = require("@controllers")
redis = global.redis = RedisClient({
withWsAdapter: true
})
DB = new DbManager()
eventBus = new EventEmitter()
storage = global.storage = StorageClient()
events = internalEvents
async onInitialize() {
for (const [eventName, eventHandler] of Object.entries(internalEvents)) {
this.eventBus.on(eventName, eventHandler)
}
await this.redis.initialize()
await this.DB.initialize()
await this.initializeConfigDB()
await this.storage.initialize()
await this.checkSetup()
}
initializeConfigDB = async () => {
let serverConfig = await Config.findOne({ key: "server" }).catch(() => {
return false
})
if (!serverConfig) {
serverConfig = new Config({
key: "server",
value: {
setup: false,
},
})
await serverConfig.save()
}
}
checkSetup = async () => {
return new Promise(async (resolve, reject) => {
let setupOk = (await Config.findOne({ key: "server" })).value?.setup ?? false
if (!setupOk) {
console.log("⚠️ Server setup is not complete, running setup proccess.")
let setupScript = await import("./setup")
setupScript = setupScript.default ?? setupScript
try {
for await (let script of setupScript) {
await script()
}
console.log("✅ Server setup complete.")
await Config.updateOne({ key: "server" }, { value: { setup: true } })
return resolve()
} catch (error) {
console.log("❌ Server setup failed.")
console.error(error)
process.exit(1)
}
}
return resolve()
})
}
handleWsAuth = async (socket, token, err) => {
try {
const validation = await Token.validate(token)
if (!validation.valid) {
if (validation.error) {
return err(`auth:server_error`)
}
return err(`auth:token_invalid`)
}
const userData = await User.findById(validation.data.user_id).catch((err) => {
console.error(`[${socket.id}] failed to get user data caused by server error`, err)
return null
})
if (!userData) {
return err(`auth:user_failed`)
}
socket.userData = userData
socket.token = token
socket.session = validation.data
return {
token: token,
username: userData.username,
user_id: userData._id,
}
} catch (error) {
return err(`auth:authentification_failed`, error)
}
}
}
Boot(API)

View File

@ -1,168 +1,217 @@
import jwt from "jsonwebtoken"
import { Session, RegenerationToken } from "@shared-classes/DbModels"
export async function regenerateSession(expiredToken, refreshToken, aggregateData = {}) {
// search for a regeneration token with the expired token (Should exist only one)
const regenerationToken = await RegenerationToken.findOne({ refreshToken: refreshToken })
export default class Token {
static async validate(token) {
if (typeof token === "undefined") {
throw new Error("Token is undefined")
}
if (!regenerationToken) {
throw new Error("Cannot find regeneration token")
let result = {
expired: false,
valid: true,
data: null
}
await jwt.verify(token, global.jwtStrategy.secretOrKey, async (err, decoded) => {
if (err) {
result.valid = false
result.error = err.message
if (err.message === "jwt expired") {
result.expired = true
}
return
}
result = { ...result, ...decoded }
const sessions = await Session.find({ user_id: result.user_id })
const sessionsTokens = sessions.map((session) => {
if (session.user_id === result.user_id) {
return session.token
}
})
if (!sessionsTokens.includes(token)) {
result.valid = false
result.error = "Session token not found"
} else {
result.valid = true
}
})
if (result.valid) {
result.data = await jwt.decode(token)
}
return result
}
// check if the regeneration token is valid and not expired
let decodedRefreshToken = null
let decodedExpiredToken = null
static async regenerate(expiredToken, refreshToken, aggregateData = {}) {
// search for a regeneration token with the expired token (Should exist only one)
const regenerationToken = await RegenerationToken.findOne({ refreshToken: refreshToken })
try {
decodedRefreshToken = jwt.decode(refreshToken)
decodedExpiredToken = jwt.decode(expiredToken)
} catch (error) {
console.error(error)
// TODO: Storage this incident
}
if (!regenerationToken) {
throw new Error("Cannot find regeneration token")
}
if (!decodedRefreshToken) {
throw new Error("Cannot decode refresh token")
}
// check if the regeneration token is valid and not expired
let decodedRefreshToken = null
let decodedExpiredToken = null
if (!decodedExpiredToken) {
throw new Error("Cannot decode expired token")
}
try {
decodedRefreshToken = jwt.decode(refreshToken)
decodedExpiredToken = jwt.decode(expiredToken)
} catch (error) {
console.error(error)
// TODO: Storage this incident
}
// is not needed to verify the expired token, because it suppossed to be expired
if (!decodedRefreshToken) {
throw new Error("Cannot decode refresh token")
}
// verify refresh token
await jwt.verify(refreshToken, global.jwtStrategy.secretOrKey, async (err) => {
// check if is expired
if (err) {
if (err.message === "jwt expired") {
// check if server has enabled the enforcement of regeneration token expiration
if (global.jwtStrategy.enforceRegenerationTokenExpiration) {
// delete the regeneration token
await RegenerationToken.deleteOne({ refreshToken: refreshToken })
if (!decodedExpiredToken) {
throw new Error("Cannot decode expired token")
}
throw new Error("Regeneration token expired and cannot be regenerated due server has enabled enforcement security policy")
// is not needed to verify the expired token, because it suppossed to be expired
// verify refresh token
await jwt.verify(refreshToken, global.jwtStrategy.secretOrKey, async (err) => {
// check if is expired
if (err) {
if (err.message === "jwt expired") {
// check if server has enabled the enforcement of regeneration token expiration
if (global.jwtStrategy.enforceRegenerationTokenExpiration) {
// delete the regeneration token
await RegenerationToken.deleteOne({ refreshToken: refreshToken })
throw new Error("Regeneration token expired and cannot be regenerated due server has enabled enforcement security policy")
}
}
}
// check if the regeneration token is associated with the expired token
if (decodedRefreshToken.expiredToken !== expiredToken) {
throw new Error("Regeneration token is not associated with the expired token")
}
})
// find the session associated with the expired token
const session = await Session.findOne({ token: expiredToken })
if (!session) {
throw new Error("Cannot find session associated with the expired token")
}
// check if the regeneration token is associated with the expired token
if (decodedRefreshToken.expiredToken !== expiredToken) {
throw new Error("Regeneration token is not associated with the expired token")
// generate a new token
const newToken = await createNewAuthToken({
username: decodedExpiredToken.username,
session_uuid: session.session_uuid,
user_id: decodedExpiredToken.user_id,
ip_address: aggregateData.ip_address,
}, {
updateSession: session._id,
})
// delete the regeneration token
await RegenerationToken.deleteOne({ refreshToken: refreshToken })
return newToken
}
static async createAuth(payload, options = {}) {
if (options.updateSession) {
const sessionData = await Session.findOne({ _id: options.updateSession })
payload.session_uuid = sessionData.session_uuid
} else {
payload.session_uuid = global.nanoid()
}
})
// find the session associated with the expired token
const session = await Session.findOne({ token: expiredToken })
const token = jwt.sign({
session_uuid: payload.session_uuid,
username: payload.username,
user_id: payload.user_id,
signLocation: payload.signLocation,
}, global.jwtStrategy.secretOrKey, {
expiresIn: global.jwtStrategy.expiresIn ?? "1h",
algorithm: global.jwtStrategy.algorithm ?? "HS256"
})
if (!session) {
throw new Error("Cannot find session associated with the expired token")
}
// generate a new token
const newToken = await createNewAuthToken({
username: decodedExpiredToken.username,
session_uuid: session.session_uuid,
user_id: decodedExpiredToken.user_id,
ip_address: aggregateData.ip_address,
}, {
updateSession: session._id,
})
// delete the regeneration token
await RegenerationToken.deleteOne({ refreshToken: refreshToken })
return newToken
}
export async function getRegenerationToken(expiredToken) {
const regenerationToken = await RegenerationToken.findOne({ expiredToken }).catch((error) => false)
return regenerationToken ?? false
}
export async function createNewRegenerationToken(expiredToken) {
// check if token is only expired, if is corrupted, reject
let decoded = null
try {
decoded = jwt.decode(expiredToken)
} catch (error) {
console.error(error)
}
if (!decoded) {
return false
}
// check if token exists on a session
const sessions = await Session.find({ user_id: decoded.user_id })
const currentSession = sessions.find((session) => session.token === expiredToken)
if (!currentSession) {
throw new Error("This token is not associated with any session")
}
// create a new refresh token and sign it with maximum expiration time of 1 day
const refreshToken = jwt.sign(
{
expiredToken
},
global.jwtStrategy.secretOrKey,
{
expiresIn: "1d"
const session = {
token: token,
session_uuid: payload.session_uuid,
username: payload.username,
user_id: payload.user_id,
location: payload.signLocation,
ip_address: payload.ip_address,
client: payload.client,
date: new Date().getTime(),
}
)
// create a new regeneration token and save it
const regenerationToken = new RegenerationToken({
expiredToken,
refreshToken,
})
if (options.updateSession) {
await Session.findByIdAndUpdate(options.updateSession, session)
} else {
let newSession = new Session(session)
await regenerationToken.save()
await newSession.save()
}
// return the regeneration token
return regenerationToken
}
export async function createNewAuthToken(payload, options = {}) {
if (options.updateSession) {
const sessionData = await Session.findOne({ _id: options.updateSession })
payload.session_uuid = sessionData.session_uuid
} else {
payload.session_uuid = global.nanoid()
return token
}
const token = jwt.sign({
session_uuid: payload.session_uuid,
username: payload.username,
user_id: payload.user_id,
signLocation: payload.signLocation,
}, global.jwtStrategy.secretOrKey, {
expiresIn: global.jwtStrategy.expiresIn ?? "1h",
algorithm: global.jwtStrategy.algorithm ?? "HS256"
})
static async createRegenerative(expiredToken) {
// check if token is only expired, if is corrupted, reject
let decoded = null
const session = {
token: token,
session_uuid: payload.session_uuid,
username: payload.username,
user_id: payload.user_id,
location: payload.signLocation,
ip_address: payload.ip_address,
client: payload.client,
date: new Date().getTime(),
try {
decoded = jwt.decode(expiredToken)
} catch (error) {
console.error(error)
}
if (!decoded) {
return false
}
// check if token exists on a session
const sessions = await Session.find({ user_id: decoded.user_id })
const currentSession = sessions.find((session) => session.token === expiredToken)
if (!currentSession) {
throw new Error("This token is not associated with any session")
}
// create a new refresh token and sign it with maximum expiration time of 1 day
const refreshToken = jwt.sign(
{
expiredToken
},
global.jwtStrategy.secretOrKey,
{
expiresIn: "1d"
}
)
// create a new regeneration token and save it
const regenerationToken = new RegenerationToken({
expiredToken,
refreshToken,
})
await regenerationToken.save()
// return the regeneration token
return regenerationToken
}
if (options.updateSession) {
await Session.findByIdAndUpdate(options.updateSession, session)
} else {
let newSession = new Session(session)
static async getRegenerationToken(expiredToken) {
const regenerationToken = await RegenerationToken.findOne({ expiredToken })
await newSession.save()
return regenerationToken
}
return token
}

View File

@ -0,0 +1 @@
/opt/comty-federated/@public/shared/classes/RTEngineServer

View File

@ -1,4 +1,4 @@
import { Token } from "@lib"
import Token from "@lib/token"
export default async (expiredToken) => {
let regenerationToken = null
@ -10,7 +10,7 @@ export default async (expiredToken) => {
regenerationToken = associatedRegenerationToken.refreshToken
} else {
// create a new regeneration token with the expired token
regenerationToken = await Token.createNewRegenerationToken(expiredToken)
regenerationToken = await Token.createRegenerative(expiredToken)
}
return regenerationToken.refreshToken