Spaces:
Runtime error
Runtime error
| // simple express app | |
| const express = require("express"); | |
| const cluster = require('node:cluster'); | |
| const cors = require("cors"); | |
| const cache = require("./cache") | |
| const logger = require('./winston'); | |
| const fs = require('fs'); | |
| const path = require('path'); | |
| const crypto = require('crypto'); | |
| const Sentry = require("@sentry/node"); | |
| // import { ProfilingIntegration } from "@sentry/profiling-node"; | |
| const { ProfilingIntegration } = require("@sentry/profiling-node"); | |
| const TEMP_FILE_DIR = path.join(__dirname, 'temp_files'); | |
| const CACHE_TIMEOUT = 5 * 60 * 1000; //5 minutes in milliseconds | |
| // fs.mkdirSync(TEMP_FILE_DIR, { recursive: true }); | |
| var eachfilesize = {}; | |
| let fetchingQueue = [] | |
| /** | |
| * Extracts the CID from the given URL. | |
| * | |
| * @param {string} url - The URL from which to extract the CID | |
| * @return {string} The extracted CID | |
| */ | |
| function extractCID(url) { | |
| if (url.includes("dweb")) { | |
| let parts = url.split("/"); | |
| let subParts = parts[2].split("."); | |
| return subParts[0]; | |
| } else { | |
| return url.split("/").pop(); | |
| } | |
| } | |
| /** | |
| * Calculate the total size of the files associated with the given content identifiers (CIDs). | |
| * | |
| * @param {array} cids - An array of content identifiers | |
| * @return {number} The total size of the files in bytes | |
| */ | |
| async function getFileSize(cids) { | |
| var eachfilesize = {}; | |
| var fileSize = 0; | |
| for (var cid of cids) { | |
| if (cid.includes("https://")) { | |
| cid = extractCID(cid); | |
| } | |
| const res = await fetch(`https://ipfs.particle.network/${cid}`, { | |
| method: "HEAD", | |
| }); | |
| // console.log(res.headers.get("Content-Length") ) | |
| fileSize += parseInt(res.headers.get("Content-Length"), 10); | |
| eachfilesize[cid] = parseInt(res.headers.get("Content-Length"), 10); | |
| } | |
| return [fileSize, eachfilesize]; | |
| } | |
| /** | |
| * Formats the given number of bytes into a human-readable string representation. | |
| * | |
| * @param {number} bytes - The number of bytes to be formatted. | |
| * @param {number} [decimals=2] - The number of decimals to round the result to. | |
| * @return {string} The formatted string representation of the input bytes. | |
| */ | |
| function formatBytes(bytes, decimals = 2) { | |
| if (!+bytes) return "0 Bytes"; | |
| const k = 1024; | |
| const dm = decimals < 0 ? 0 : decimals; | |
| const sizes = [ | |
| "Bytes", | |
| "KiB", | |
| "MiB", | |
| "GiB", | |
| "TiB", | |
| "PiB", | |
| "EiB", | |
| "ZiB", | |
| "YiB", | |
| ]; | |
| const i = Math.floor(Math.log(bytes) / Math.log(k)); | |
| return `${parseFloat((bytes / Math.pow(k, i)).toFixed(dm))} ${sizes[i]}`; | |
| } | |
| /** | |
| * Create a safe filename based on the URL (hashing, etc.). | |
| * | |
| * @param {string} url - The URL to create the filename from | |
| * @return {string} The safe filename created from the URL | |
| */ | |
| function getCacheFilename(url) { | |
| // Create a safe filename based on the URL (hashing, etc.) | |
| return 'cached_' + crypto.createHash('md5').update(url).digest('hex'); | |
| } | |
| /** | |
| * Asynchronously fetches data in chunks from the specified URL using the given headers and controller. | |
| * | |
| * @param {string} url - The URL to fetch data from | |
| * @param {Object} headers - The headers to include in the fetch request | |
| * @param {AbortController} controller - The controller to signal the fetch should be aborted | |
| * @param {number} start - The starting byte index for fetching data | |
| * @param {number} fileSize - The total size of the file being fetched | |
| */ | |
| async function customFetch(url, res, start, fileSize) { | |
| logger.info('Fetching ' + url); | |
| const cacheFilename = getCacheFilename(url); | |
| const cacheFilePath = path.join(TEMP_FILE_DIR, cacheFilename); | |
| if(fileSize > eachfilesize[extractCID(url)]) { | |
| logger.error(`File size are larger than the actual file`); | |
| } | |
| // Check for cached content | |
| if (fs.existsSync(cacheFilePath)) { | |
| const fileStats = fs.statSync(cacheFilePath); | |
| const fileAge = Date.now() - fileStats.mtimeMs; | |
| if (fileAge < CACHE_TIMEOUT) { | |
| logger.info(`Serving from cache: ${cacheFilePath}`); | |
| // for (; start < fileSize; start += chunkSize) { | |
| // let end = Math.min(start + chunkSize - 1, fileSize - 1); | |
| logger.info(`Start: ${start} Filesize: ${fileSize}`); | |
| fileSize = fileSize - 1; | |
| // let chunkData = fs.readFileSync(cacheFilePath, { start: start, end: fileSize }); | |
| let stream = fs.createReadStream(cacheFilePath, { start: start, end: fileSize , highWaterMark: 10 * 1024 * 1024 }); | |
| // let chunkData = []; | |
| for await (const chunk of stream) { | |
| // chunkData.push(chunk); | |
| res.write(chunk); | |
| } | |
| // chunkData = Buffer.concat(chunkData); | |
| // res.write(new Buffer.from(chunkData)); | |
| // } | |
| logger.info('Serving done from cache: ' + cacheFilePath); | |
| // backgroundFetch(url); | |
| } else { | |
| logger.info(`Cache expired for ${cacheFilePath}`); | |
| await fetchAndCache(url, res, start, fileSize); | |
| } | |
| // console.timeEnd('Fetching ' + url); | |
| } else { | |
| logger.info(`Cache not found for ${cacheFilePath}`); | |
| await fetchAndCache(url, res, start, fileSize); | |
| } | |
| } | |
| /** | |
| * Asynchronously fetches data from the given URL and caches it in the response stream. | |
| * | |
| * @param {string} url - The URL to fetch data from | |
| * @param {object} res - The response stream to cache the data in | |
| * @param {number} start - The start position of the data to fetch | |
| * @param {number} fileSize - The total size of the file being fetched | |
| */ | |
| async function fetchAndCache(url, res, start, fileSize) { | |
| backgroundFetch(url); | |
| logger.info('Raw Fetching ' + url); | |
| let chunkSize = 10 * 1024 * 1024; // 10 MB! | |
| for (; start < fileSize; start += chunkSize) { | |
| let end = Math.min(start + chunkSize - 1, fileSize - 1); | |
| // console.log('Fetch ' + start + ' - ' + end); | |
| let headers = { Range: `bytes=${start}-${end}` }; | |
| let response = await fetch(url, { headers }); | |
| let chunkData = await response.arrayBuffer(); // Or other method for data type | |
| res.write(new Uint8Array(chunkData)); | |
| } | |
| logger.info('Fetching done: ' + url); | |
| } | |
| async function backgroundFetch(url) { | |
| if(fetchingQueue.includes(url)) { | |
| logger.warn('Fetching already in queue: ' + url); | |
| return; | |
| } else { | |
| logger.info('Adding to queue: ' + url); | |
| fetchingQueue.push(url) | |
| logger.info('Ready to cache: ' + url); | |
| const response = await fetch(url); | |
| const chunkData = await response.arrayBuffer(); // Or other method for data type | |
| const cacheFilename = getCacheFilename(url); | |
| logger.info("Successfully download cached content: " + url); | |
| // remove file if exists | |
| // if (fs.existsSync(path.join(TEMP_FILE_DIR, cacheFilename))) { | |
| // logger.info('File exists, removing: ' + path.join(TEMP_FILE_DIR, cacheFilename)); | |
| // fs.unlinkSync(path.join(TEMP_FILE_DIR, cacheFilename)); | |
| // } | |
| const writeStream = fs.createWriteStream(path.join(TEMP_FILE_DIR, cacheFilename)); | |
| writeStream.write(Buffer.from(chunkData)); | |
| writeStream.on('drain', () => { | |
| logger.info('Buffer drained, data written'); | |
| fetchingQueue.splice(fetchingQueue.indexOf(url), 1); | |
| writeStream.end(); | |
| }); | |
| } | |
| } | |
| if (cluster.isPrimary) { | |
| console.log(`Primary ${process.pid} is running`); | |
| // Fork workers. | |
| for (let i = 0; i < 4; i++) { | |
| cluster.fork(); | |
| } | |
| cluster.on('exit', function(worker, code, signal) { | |
| if (worker.suicide) { | |
| console.log(new Date()+' Worker committed suicide'); | |
| cluster.fork(); | |
| } | |
| }); | |
| } else { | |
| const app = express(); | |
| Sentry.init({ | |
| dsn: "https://72e49dab320396a1376f4ec248ccf515@o4505724146089984.ingest.us.sentry.io/4506868413693952", | |
| integrations: [ | |
| // enable HTTP calls tracing | |
| new Sentry.Integrations.Http({ tracing: true }), | |
| // enable Express.js middleware tracing | |
| new Sentry.Integrations.Express({ app }), | |
| new ProfilingIntegration(), | |
| ], | |
| // Performance Monitoring | |
| tracesSampleRate: 1.0, // Capture 100% of the transactions | |
| // Set sampling rate for profiling - this is relative to tracesSampleRate | |
| profilesSampleRate: 1.0, | |
| }); | |
| // The request handler must be the first middleware on the app | |
| app.use(Sentry.Handlers.requestHandler()); | |
| // TracingHandler creates a trace for every incoming request | |
| app.use(Sentry.Handlers.tracingHandler()); | |
| app.use( | |
| cors({ | |
| origin: "*", | |
| }) | |
| ); | |
| app.get("/", async (req, res) => { | |
| // console.log('HERE'); | |
| const shared = req.query.shared; | |
| const filename = req.query.filename; | |
| if (shared) { | |
| // My code lol | |
| // Api for getting data cids | |
| const data = await cache.cache(shared+"metadata", async () => { | |
| const APIRES = await fetch( | |
| "https://www.ufsdrive.com/api/reqdata?shared=" + shared | |
| ); | |
| if (!APIRES.ok) { | |
| // ctx.waitUntil(log("Get data fail")); | |
| logger.warn("File not found"); | |
| APIRES.status(404).send("Not found"); | |
| } | |
| return await APIRES.json(); | |
| }, 60); | |
| const file = data.data; | |
| // console.log(file); | |
| // await cache.deleteCache(shared+"size"); | |
| var siez; | |
| [siez, eachfilesize] = await cache.cache(shared+"size", async () => { | |
| logger.warn("Rebuild cache") | |
| if (file.profile_picture != "Multipart") { | |
| return await getFileSize([file.profile_picture]); | |
| } else { | |
| return await getFileSize(file.data); | |
| } | |
| }, 30); | |
| // var [siez, eachfilesize]; | |
| // console.log(siez) | |
| var parts, start, end; | |
| if (req.headers.range != null) { | |
| // console.log(req.headers.range); | |
| parts = req.headers.range.replace(/bytes=/, "").split("-"); | |
| start = parseInt(parts[0], 10); | |
| end = parts[1] ? parseInt(parts[1], 10) : siez - 1; | |
| } | |
| data.filename = data.filename.replace(/[^a-zA-Z 0-9\.]+/g, ""); | |
| var headers = { | |
| "content-type": data.contentType, | |
| "Cache-Control": "public, max-age=29030400", | |
| "Content-Disposition": `inline; filename="${filename ? filename : data.filename}"`, | |
| "Access-Control-Allow-Origin": "*", | |
| "Access-Control-Allow-Methods": "GET", | |
| "Access-Control-Expose-Headers": | |
| "Content-Range, Content-Length, ETag, Access-Control-Allow-Methods, Access-Control-Allow-Origin", | |
| Vary: "Origin, Access-Control-Request-Headers, Access-Control-Request-Method, Accept-Encoding", | |
| Connection: "keep-alive", | |
| "Accept-Ranges": "bytes", | |
| }; | |
| // if requested has content-range header then we need to change the headers | |
| if (req.headers.range) { | |
| headers["Content-Range"] = `bytes ${start}-${end}/${siez}`; | |
| headers["Content-Length"] = end - start + 1; | |
| } else { | |
| headers["Content-Length"] = siez; | |
| } | |
| // console.log(headers); | |
| // res.set(headers); | |
| if (req.headers.range) { | |
| res.writeHead(206, headers); | |
| } else { | |
| res.writeHead(200, headers); | |
| } | |
| if (req.headers.range) { | |
| // send only requested. | |
| // await log("Have range query") | |
| logger.info("Range query") | |
| if (file.profile_picture != "Multipart") { | |
| // await log('One File Query'); | |
| logger.info("One file range query") | |
| const res2 = await fetch( | |
| `https://ipfs.particle.network/${extractCID(file.profile_picture)}`, | |
| { | |
| method: "GET", | |
| headers: { | |
| Range: req.headers.range, | |
| }, | |
| } | |
| ); | |
| const blob = new Uint8Array(await res2.arrayBuffer()); | |
| // controller.enqueue(blob); | |
| // controller.close(); | |
| logger.info("One file range query done") | |
| res.write(blob); | |
| res.end(); | |
| } else { | |
| logger.info("Multiples file range query") | |
| var content_range_start = start; | |
| var content_range_end = end; | |
| var current_range = 0; | |
| // console.time("Cached hit") | |
| file.data = await cache.cache(shared+"cids", async () => { | |
| return file.data.map((cid) => extractCID(cid)); | |
| }, 60 * 60 * 24) | |
| // console.timeEnd("Cached hit") | |
| // console.time("Processing") | |
| for (var cid of file.data) { | |
| // Skip chunks completely outside the range | |
| if (current_range + eachfilesize[cid] <= content_range_start) { | |
| current_range += eachfilesize[cid]; | |
| continue; | |
| } | |
| // Calculate the starting byte within the current chunk | |
| let start_byte = Math.max(content_range_start - current_range, 0); | |
| // Calculate the number of bytes to fetch from this chunk | |
| let bytes_to_fetch = Math.min( | |
| eachfilesize[cid] - start_byte, | |
| content_range_end - current_range | |
| ); | |
| // Adjust the range for the fetch request | |
| let fetch_start = start_byte; | |
| let fetch_end = Math.min( | |
| start_byte + bytes_to_fetch, | |
| eachfilesize[cid] | |
| ); | |
| await customFetch( | |
| `https://ipfs.particle.network/${cid}`, | |
| res, | |
| fetch_start, | |
| fetch_end | |
| ) | |
| // Update the current range | |
| current_range += bytes_to_fetch; | |
| // Check if the end of the requested range has been reached | |
| if (current_range >= content_range_end) { | |
| // controller.close(); | |
| // res.end(); | |
| logger.warn("File size reach larger than content-range-end"); | |
| break; | |
| } | |
| } | |
| logger.info("Multiples file range query done") | |
| // controller.close(); | |
| res.end(); | |
| } | |
| } else { | |
| logger.info("No range query") | |
| if (file.profile_picture != "Multipart") { | |
| logger.info("Req one file") | |
| // siez = await getFileSize([file.profile_picture]); | |
| // log("Comsume data") | |
| // const res2 = await fetch( | |
| // `https://ipfs.particle.network/${extractCID(file.profile_picture)}` | |
| // ); | |
| // const blob = new Uint8Array(await res2.arrayBuffer()); | |
| // res.write(blob); | |
| await customFetch( | |
| `https://ipfs.particle.network/${extractCID(file.profile_picture)}`, | |
| res, | |
| 0, | |
| eachfilesize[extractCID(file.profile_picture)] | |
| ) | |
| logger.info("Req one file done") | |
| res.end(); | |
| } else { | |
| // siez = await getFileSize(file.data); | |
| // console.log("Req full file"); | |
| logger.info("Req full file") | |
| // console.time("Req full file") | |
| file.data = await cache.cache(shared+"cids", async () => { | |
| return file.data.map((cid) => extractCID(cid)); | |
| }, 60 * 60 * 24) | |
| for (var cid of file.data) { | |
| await customFetch( | |
| `https://ipfs.particle.network/${cid}`, | |
| res, | |
| 0, | |
| eachfilesize[cid] | |
| ); | |
| // controller.enqueue(blob); | |
| } | |
| // console.log("Done"); | |
| logger.info("Req full file done") | |
| res.end(); | |
| // console.timeEnd("Req full file") | |
| } | |
| } | |
| // log("Success") | |
| // const tstream = new TransformStream(); | |
| // // ctx.waitUntil(readable.pipeTo(tstream.writable)); | |
| // await readable.pipeTo(tstream.writable) | |
| // log("Sended!") | |
| // if (req.headers.range) { | |
| // res.status(206).send(readable); | |
| // } else { | |
| // res.status(200).send(readable); | |
| // } | |
| // tstream.readable.pipeTo(res); | |
| // res.pipe(tstream.readable); | |
| } else { | |
| logger.warn("Request fail by missing parameters") | |
| res.status(400).send("Missing shared or filename parameter"); | |
| } | |
| }); | |
| app.listen(7860, "0.0.0.0",async () => { | |
| // console.log("Server started on port 7860") | |
| logger.info("Server started on port 7860") | |
| }); | |
| } |