// simple express app const express = require("express"); const cluster = require('node:cluster'); const cors = require("cors"); const cache = require("./cache") const logger = require('./winston'); const fs = require('fs'); const path = require('path'); const crypto = require('crypto'); const Sentry = require("@sentry/node"); // import { ProfilingIntegration } from "@sentry/profiling-node"; const { ProfilingIntegration } = require("@sentry/profiling-node"); const TEMP_FILE_DIR = path.join(__dirname, 'temp_files'); const CACHE_TIMEOUT = 5 * 60 * 1000; //5 minutes in milliseconds // fs.mkdirSync(TEMP_FILE_DIR, { recursive: true }); var eachfilesize = {}; let fetchingQueue = [] /** * Extracts the CID from the given URL. * * @param {string} url - The URL from which to extract the CID * @return {string} The extracted CID */ function extractCID(url) { if (url.includes("dweb")) { let parts = url.split("/"); let subParts = parts[2].split("."); return subParts[0]; } else { return url.split("/").pop(); } } /** * Calculate the total size of the files associated with the given content identifiers (CIDs). * * @param {array} cids - An array of content identifiers * @return {number} The total size of the files in bytes */ async function getFileSize(cids) { var eachfilesize = {}; var fileSize = 0; for (var cid of cids) { if (cid.includes("https://")) { cid = extractCID(cid); } const res = await fetch(`https://ipfs.particle.network/${cid}`, { method: "HEAD", }); // console.log(res.headers.get("Content-Length") ) fileSize += parseInt(res.headers.get("Content-Length"), 10); eachfilesize[cid] = parseInt(res.headers.get("Content-Length"), 10); } return [fileSize, eachfilesize]; } /** * Formats the given number of bytes into a human-readable string representation. * * @param {number} bytes - The number of bytes to be formatted. * @param {number} [decimals=2] - The number of decimals to round the result to. * @return {string} The formatted string representation of the input bytes. */ function formatBytes(bytes, decimals = 2) { if (!+bytes) return "0 Bytes"; const k = 1024; const dm = decimals < 0 ? 0 : decimals; const sizes = [ "Bytes", "KiB", "MiB", "GiB", "TiB", "PiB", "EiB", "ZiB", "YiB", ]; const i = Math.floor(Math.log(bytes) / Math.log(k)); return `${parseFloat((bytes / Math.pow(k, i)).toFixed(dm))} ${sizes[i]}`; } /** * Create a safe filename based on the URL (hashing, etc.). * * @param {string} url - The URL to create the filename from * @return {string} The safe filename created from the URL */ function getCacheFilename(url) { // Create a safe filename based on the URL (hashing, etc.) return 'cached_' + crypto.createHash('md5').update(url).digest('hex'); } /** * Asynchronously fetches data in chunks from the specified URL using the given headers and controller. * * @param {string} url - The URL to fetch data from * @param {Object} headers - The headers to include in the fetch request * @param {AbortController} controller - The controller to signal the fetch should be aborted * @param {number} start - The starting byte index for fetching data * @param {number} fileSize - The total size of the file being fetched */ async function customFetch(url, res, start, fileSize) { logger.info('Fetching ' + url); const cacheFilename = getCacheFilename(url); const cacheFilePath = path.join(TEMP_FILE_DIR, cacheFilename); if(fileSize > eachfilesize[extractCID(url)]) { logger.error(`File size are larger than the actual file`); } // Check for cached content if (fs.existsSync(cacheFilePath)) { const fileStats = fs.statSync(cacheFilePath); const fileAge = Date.now() - fileStats.mtimeMs; if (fileAge < CACHE_TIMEOUT) { logger.info(`Serving from cache: ${cacheFilePath}`); // for (; start < fileSize; start += chunkSize) { // let end = Math.min(start + chunkSize - 1, fileSize - 1); logger.info(`Start: ${start} Filesize: ${fileSize}`); fileSize = fileSize - 1; // let chunkData = fs.readFileSync(cacheFilePath, { start: start, end: fileSize }); let stream = fs.createReadStream(cacheFilePath, { start: start, end: fileSize , highWaterMark: 10 * 1024 * 1024 }); // let chunkData = []; for await (const chunk of stream) { // chunkData.push(chunk); res.write(chunk); } // chunkData = Buffer.concat(chunkData); // res.write(new Buffer.from(chunkData)); // } logger.info('Serving done from cache: ' + cacheFilePath); // backgroundFetch(url); } else { logger.info(`Cache expired for ${cacheFilePath}`); await fetchAndCache(url, res, start, fileSize); } // console.timeEnd('Fetching ' + url); } else { logger.info(`Cache not found for ${cacheFilePath}`); await fetchAndCache(url, res, start, fileSize); } } /** * Asynchronously fetches data from the given URL and caches it in the response stream. * * @param {string} url - The URL to fetch data from * @param {object} res - The response stream to cache the data in * @param {number} start - The start position of the data to fetch * @param {number} fileSize - The total size of the file being fetched */ async function fetchAndCache(url, res, start, fileSize) { backgroundFetch(url); logger.info('Raw Fetching ' + url); let chunkSize = 10 * 1024 * 1024; // 10 MB! for (; start < fileSize; start += chunkSize) { let end = Math.min(start + chunkSize - 1, fileSize - 1); // console.log('Fetch ' + start + ' - ' + end); let headers = { Range: `bytes=${start}-${end}` }; let response = await fetch(url, { headers }); let chunkData = await response.arrayBuffer(); // Or other method for data type res.write(new Uint8Array(chunkData)); } logger.info('Fetching done: ' + url); } async function backgroundFetch(url) { if(fetchingQueue.includes(url)) { logger.warn('Fetching already in queue: ' + url); return; } else { logger.info('Adding to queue: ' + url); fetchingQueue.push(url) logger.info('Ready to cache: ' + url); const response = await fetch(url); const chunkData = await response.arrayBuffer(); // Or other method for data type const cacheFilename = getCacheFilename(url); logger.info("Successfully download cached content: " + url); // remove file if exists // if (fs.existsSync(path.join(TEMP_FILE_DIR, cacheFilename))) { // logger.info('File exists, removing: ' + path.join(TEMP_FILE_DIR, cacheFilename)); // fs.unlinkSync(path.join(TEMP_FILE_DIR, cacheFilename)); // } const writeStream = fs.createWriteStream(path.join(TEMP_FILE_DIR, cacheFilename)); writeStream.write(Buffer.from(chunkData)); writeStream.on('drain', () => { logger.info('Buffer drained, data written'); fetchingQueue.splice(fetchingQueue.indexOf(url), 1); writeStream.end(); }); } } if (cluster.isPrimary) { console.log(`Primary ${process.pid} is running`); // Fork workers. for (let i = 0; i < 4; i++) { cluster.fork(); } cluster.on('exit', function(worker, code, signal) { if (worker.suicide) { console.log(new Date()+' Worker committed suicide'); cluster.fork(); } }); } else { const app = express(); Sentry.init({ dsn: "https://72e49dab320396a1376f4ec248ccf515@o4505724146089984.ingest.us.sentry.io/4506868413693952", integrations: [ // enable HTTP calls tracing new Sentry.Integrations.Http({ tracing: true }), // enable Express.js middleware tracing new Sentry.Integrations.Express({ app }), new ProfilingIntegration(), ], // Performance Monitoring tracesSampleRate: 1.0, // Capture 100% of the transactions // Set sampling rate for profiling - this is relative to tracesSampleRate profilesSampleRate: 1.0, }); // The request handler must be the first middleware on the app app.use(Sentry.Handlers.requestHandler()); // TracingHandler creates a trace for every incoming request app.use(Sentry.Handlers.tracingHandler()); app.use( cors({ origin: "*", }) ); app.get("/", async (req, res) => { // console.log('HERE'); const shared = req.query.shared; const filename = req.query.filename; if (shared) { // My code lol // Api for getting data cids const data = await cache.cache(shared+"metadata", async () => { const APIRES = await fetch( "https://www.ufsdrive.com/api/reqdata?shared=" + shared ); if (!APIRES.ok) { // ctx.waitUntil(log("Get data fail")); logger.warn("File not found"); APIRES.status(404).send("Not found"); } return await APIRES.json(); }, 60); const file = data.data; // console.log(file); // await cache.deleteCache(shared+"size"); var siez; [siez, eachfilesize] = await cache.cache(shared+"size", async () => { logger.warn("Rebuild cache") if (file.profile_picture != "Multipart") { return await getFileSize([file.profile_picture]); } else { return await getFileSize(file.data); } }, 30); // var [siez, eachfilesize]; // console.log(siez) var parts, start, end; if (req.headers.range != null) { // console.log(req.headers.range); parts = req.headers.range.replace(/bytes=/, "").split("-"); start = parseInt(parts[0], 10); end = parts[1] ? parseInt(parts[1], 10) : siez - 1; } data.filename = data.filename.replace(/[^a-zA-Z 0-9\.]+/g, ""); var headers = { "content-type": data.contentType, "Cache-Control": "public, max-age=29030400", "Content-Disposition": `inline; filename="${filename ? filename : data.filename}"`, "Access-Control-Allow-Origin": "*", "Access-Control-Allow-Methods": "GET", "Access-Control-Expose-Headers": "Content-Range, Content-Length, ETag, Access-Control-Allow-Methods, Access-Control-Allow-Origin", Vary: "Origin, Access-Control-Request-Headers, Access-Control-Request-Method, Accept-Encoding", Connection: "keep-alive", "Accept-Ranges": "bytes", }; // if requested has content-range header then we need to change the headers if (req.headers.range) { headers["Content-Range"] = `bytes ${start}-${end}/${siez}`; headers["Content-Length"] = end - start + 1; } else { headers["Content-Length"] = siez; } // console.log(headers); // res.set(headers); if (req.headers.range) { res.writeHead(206, headers); } else { res.writeHead(200, headers); } if (req.headers.range) { // send only requested. // await log("Have range query") logger.info("Range query") if (file.profile_picture != "Multipart") { // await log('One File Query'); logger.info("One file range query") const res2 = await fetch( `https://ipfs.particle.network/${extractCID(file.profile_picture)}`, { method: "GET", headers: { Range: req.headers.range, }, } ); const blob = new Uint8Array(await res2.arrayBuffer()); // controller.enqueue(blob); // controller.close(); logger.info("One file range query done") res.write(blob); res.end(); } else { logger.info("Multiples file range query") var content_range_start = start; var content_range_end = end; var current_range = 0; // console.time("Cached hit") file.data = await cache.cache(shared+"cids", async () => { return file.data.map((cid) => extractCID(cid)); }, 60 * 60 * 24) // console.timeEnd("Cached hit") // console.time("Processing") for (var cid of file.data) { // Skip chunks completely outside the range if (current_range + eachfilesize[cid] <= content_range_start) { current_range += eachfilesize[cid]; continue; } // Calculate the starting byte within the current chunk let start_byte = Math.max(content_range_start - current_range, 0); // Calculate the number of bytes to fetch from this chunk let bytes_to_fetch = Math.min( eachfilesize[cid] - start_byte, content_range_end - current_range ); // Adjust the range for the fetch request let fetch_start = start_byte; let fetch_end = Math.min( start_byte + bytes_to_fetch, eachfilesize[cid] ); await customFetch( `https://ipfs.particle.network/${cid}`, res, fetch_start, fetch_end ) // Update the current range current_range += bytes_to_fetch; // Check if the end of the requested range has been reached if (current_range >= content_range_end) { // controller.close(); // res.end(); logger.warn("File size reach larger than content-range-end"); break; } } logger.info("Multiples file range query done") // controller.close(); res.end(); } } else { logger.info("No range query") if (file.profile_picture != "Multipart") { logger.info("Req one file") // siez = await getFileSize([file.profile_picture]); // log("Comsume data") // const res2 = await fetch( // `https://ipfs.particle.network/${extractCID(file.profile_picture)}` // ); // const blob = new Uint8Array(await res2.arrayBuffer()); // res.write(blob); await customFetch( `https://ipfs.particle.network/${extractCID(file.profile_picture)}`, res, 0, eachfilesize[extractCID(file.profile_picture)] ) logger.info("Req one file done") res.end(); } else { // siez = await getFileSize(file.data); // console.log("Req full file"); logger.info("Req full file") // console.time("Req full file") file.data = await cache.cache(shared+"cids", async () => { return file.data.map((cid) => extractCID(cid)); }, 60 * 60 * 24) for (var cid of file.data) { await customFetch( `https://ipfs.particle.network/${cid}`, res, 0, eachfilesize[cid] ); // controller.enqueue(blob); } // console.log("Done"); logger.info("Req full file done") res.end(); // console.timeEnd("Req full file") } } // log("Success") // const tstream = new TransformStream(); // // ctx.waitUntil(readable.pipeTo(tstream.writable)); // await readable.pipeTo(tstream.writable) // log("Sended!") // if (req.headers.range) { // res.status(206).send(readable); // } else { // res.status(200).send(readable); // } // tstream.readable.pipeTo(res); // res.pipe(tstream.readable); } else { logger.warn("Request fail by missing parameters") res.status(400).send("Missing shared or filename parameter"); } }); app.listen(7860, "0.0.0.0",async () => { // console.log("Server started on port 7860") logger.info("Server started on port 7860") }); }