leeminwaan's picture
Update app/index.js
15b282f verified
// simple express app
const express = require("express");
const cluster = require('node:cluster');
const cors = require("cors");
const cache = require("./cache")
const logger = require('./winston');
const fs = require('fs');
const path = require('path');
const crypto = require('crypto');
const Sentry = require("@sentry/node");
// import { ProfilingIntegration } from "@sentry/profiling-node";
const { ProfilingIntegration } = require("@sentry/profiling-node");
const TEMP_FILE_DIR = path.join(__dirname, 'temp_files');
const CACHE_TIMEOUT = 5 * 60 * 1000; //5 minutes in milliseconds
// fs.mkdirSync(TEMP_FILE_DIR, { recursive: true });
var eachfilesize = {};
let fetchingQueue = []
/**
* Extracts the CID from the given URL.
*
* @param {string} url - The URL from which to extract the CID
* @return {string} The extracted CID
*/
function extractCID(url) {
if (url.includes("dweb")) {
let parts = url.split("/");
let subParts = parts[2].split(".");
return subParts[0];
} else {
return url.split("/").pop();
}
}
/**
* Calculate the total size of the files associated with the given content identifiers (CIDs).
*
* @param {array} cids - An array of content identifiers
* @return {number} The total size of the files in bytes
*/
async function getFileSize(cids) {
var eachfilesize = {};
var fileSize = 0;
for (var cid of cids) {
if (cid.includes("https://")) {
cid = extractCID(cid);
}
const res = await fetch(`https://ipfs.particle.network/${cid}`, {
method: "HEAD",
});
// console.log(res.headers.get("Content-Length") )
fileSize += parseInt(res.headers.get("Content-Length"), 10);
eachfilesize[cid] = parseInt(res.headers.get("Content-Length"), 10);
}
return [fileSize, eachfilesize];
}
/**
* Formats the given number of bytes into a human-readable string representation.
*
* @param {number} bytes - The number of bytes to be formatted.
* @param {number} [decimals=2] - The number of decimals to round the result to.
* @return {string} The formatted string representation of the input bytes.
*/
function formatBytes(bytes, decimals = 2) {
if (!+bytes) return "0 Bytes";
const k = 1024;
const dm = decimals < 0 ? 0 : decimals;
const sizes = [
"Bytes",
"KiB",
"MiB",
"GiB",
"TiB",
"PiB",
"EiB",
"ZiB",
"YiB",
];
const i = Math.floor(Math.log(bytes) / Math.log(k));
return `${parseFloat((bytes / Math.pow(k, i)).toFixed(dm))} ${sizes[i]}`;
}
/**
* Create a safe filename based on the URL (hashing, etc.).
*
* @param {string} url - The URL to create the filename from
* @return {string} The safe filename created from the URL
*/
function getCacheFilename(url) {
// Create a safe filename based on the URL (hashing, etc.)
return 'cached_' + crypto.createHash('md5').update(url).digest('hex');
}
/**
* Asynchronously fetches data in chunks from the specified URL using the given headers and controller.
*
* @param {string} url - The URL to fetch data from
* @param {Object} headers - The headers to include in the fetch request
* @param {AbortController} controller - The controller to signal the fetch should be aborted
* @param {number} start - The starting byte index for fetching data
* @param {number} fileSize - The total size of the file being fetched
*/
async function customFetch(url, res, start, fileSize) {
logger.info('Fetching ' + url);
const cacheFilename = getCacheFilename(url);
const cacheFilePath = path.join(TEMP_FILE_DIR, cacheFilename);
if(fileSize > eachfilesize[extractCID(url)]) {
logger.error(`File size are larger than the actual file`);
}
// Check for cached content
if (fs.existsSync(cacheFilePath)) {
const fileStats = fs.statSync(cacheFilePath);
const fileAge = Date.now() - fileStats.mtimeMs;
if (fileAge < CACHE_TIMEOUT) {
logger.info(`Serving from cache: ${cacheFilePath}`);
// for (; start < fileSize; start += chunkSize) {
// let end = Math.min(start + chunkSize - 1, fileSize - 1);
logger.info(`Start: ${start} Filesize: ${fileSize}`);
fileSize = fileSize - 1;
// let chunkData = fs.readFileSync(cacheFilePath, { start: start, end: fileSize });
let stream = fs.createReadStream(cacheFilePath, { start: start, end: fileSize , highWaterMark: 10 * 1024 * 1024 });
// let chunkData = [];
for await (const chunk of stream) {
// chunkData.push(chunk);
res.write(chunk);
}
// chunkData = Buffer.concat(chunkData);
// res.write(new Buffer.from(chunkData));
// }
logger.info('Serving done from cache: ' + cacheFilePath);
// backgroundFetch(url);
} else {
logger.info(`Cache expired for ${cacheFilePath}`);
await fetchAndCache(url, res, start, fileSize);
}
// console.timeEnd('Fetching ' + url);
} else {
logger.info(`Cache not found for ${cacheFilePath}`);
await fetchAndCache(url, res, start, fileSize);
}
}
/**
* Asynchronously fetches data from the given URL and caches it in the response stream.
*
* @param {string} url - The URL to fetch data from
* @param {object} res - The response stream to cache the data in
* @param {number} start - The start position of the data to fetch
* @param {number} fileSize - The total size of the file being fetched
*/
async function fetchAndCache(url, res, start, fileSize) {
backgroundFetch(url);
logger.info('Raw Fetching ' + url);
let chunkSize = 10 * 1024 * 1024; // 10 MB!
for (; start < fileSize; start += chunkSize) {
let end = Math.min(start + chunkSize - 1, fileSize - 1);
// console.log('Fetch ' + start + ' - ' + end);
let headers = { Range: `bytes=${start}-${end}` };
let response = await fetch(url, { headers });
let chunkData = await response.arrayBuffer(); // Or other method for data type
res.write(new Uint8Array(chunkData));
}
logger.info('Fetching done: ' + url);
}
async function backgroundFetch(url) {
if(fetchingQueue.includes(url)) {
logger.warn('Fetching already in queue: ' + url);
return;
} else {
logger.info('Adding to queue: ' + url);
fetchingQueue.push(url)
logger.info('Ready to cache: ' + url);
const response = await fetch(url);
const chunkData = await response.arrayBuffer(); // Or other method for data type
const cacheFilename = getCacheFilename(url);
logger.info("Successfully download cached content: " + url);
// remove file if exists
// if (fs.existsSync(path.join(TEMP_FILE_DIR, cacheFilename))) {
// logger.info('File exists, removing: ' + path.join(TEMP_FILE_DIR, cacheFilename));
// fs.unlinkSync(path.join(TEMP_FILE_DIR, cacheFilename));
// }
const writeStream = fs.createWriteStream(path.join(TEMP_FILE_DIR, cacheFilename));
writeStream.write(Buffer.from(chunkData));
writeStream.on('drain', () => {
logger.info('Buffer drained, data written');
fetchingQueue.splice(fetchingQueue.indexOf(url), 1);
writeStream.end();
});
}
}
if (cluster.isPrimary) {
console.log(`Primary ${process.pid} is running`);
// Fork workers.
for (let i = 0; i < 4; i++) {
cluster.fork();
}
cluster.on('exit', function(worker, code, signal) {
if (worker.suicide) {
console.log(new Date()+' Worker committed suicide');
cluster.fork();
}
});
} else {
const app = express();
Sentry.init({
dsn: "https://72e49dab320396a1376f4ec248ccf515@o4505724146089984.ingest.us.sentry.io/4506868413693952",
integrations: [
// enable HTTP calls tracing
new Sentry.Integrations.Http({ tracing: true }),
// enable Express.js middleware tracing
new Sentry.Integrations.Express({ app }),
new ProfilingIntegration(),
],
// Performance Monitoring
tracesSampleRate: 1.0, // Capture 100% of the transactions
// Set sampling rate for profiling - this is relative to tracesSampleRate
profilesSampleRate: 1.0,
});
// The request handler must be the first middleware on the app
app.use(Sentry.Handlers.requestHandler());
// TracingHandler creates a trace for every incoming request
app.use(Sentry.Handlers.tracingHandler());
app.use(
cors({
origin: "*",
})
);
app.get("/", async (req, res) => {
// console.log('HERE');
const shared = req.query.shared;
const filename = req.query.filename;
if (shared) {
// My code lol
// Api for getting data cids
const data = await cache.cache(shared+"metadata", async () => {
const APIRES = await fetch(
"https://www.ufsdrive.com/api/reqdata?shared=" + shared
);
if (!APIRES.ok) {
// ctx.waitUntil(log("Get data fail"));
logger.warn("File not found");
APIRES.status(404).send("Not found");
}
return await APIRES.json();
}, 60);
const file = data.data;
// console.log(file);
// await cache.deleteCache(shared+"size");
var siez;
[siez, eachfilesize] = await cache.cache(shared+"size", async () => {
logger.warn("Rebuild cache")
if (file.profile_picture != "Multipart") {
return await getFileSize([file.profile_picture]);
} else {
return await getFileSize(file.data);
}
}, 30);
// var [siez, eachfilesize];
// console.log(siez)
var parts, start, end;
if (req.headers.range != null) {
// console.log(req.headers.range);
parts = req.headers.range.replace(/bytes=/, "").split("-");
start = parseInt(parts[0], 10);
end = parts[1] ? parseInt(parts[1], 10) : siez - 1;
}
data.filename = data.filename.replace(/[^a-zA-Z 0-9\.]+/g, "");
var headers = {
"content-type": data.contentType,
"Cache-Control": "public, max-age=29030400",
"Content-Disposition": `inline; filename="${filename ? filename : data.filename}"`,
"Access-Control-Allow-Origin": "*",
"Access-Control-Allow-Methods": "GET",
"Access-Control-Expose-Headers":
"Content-Range, Content-Length, ETag, Access-Control-Allow-Methods, Access-Control-Allow-Origin",
Vary: "Origin, Access-Control-Request-Headers, Access-Control-Request-Method, Accept-Encoding",
Connection: "keep-alive",
"Accept-Ranges": "bytes",
};
// if requested has content-range header then we need to change the headers
if (req.headers.range) {
headers["Content-Range"] = `bytes ${start}-${end}/${siez}`;
headers["Content-Length"] = end - start + 1;
} else {
headers["Content-Length"] = siez;
}
// console.log(headers);
// res.set(headers);
if (req.headers.range) {
res.writeHead(206, headers);
} else {
res.writeHead(200, headers);
}
if (req.headers.range) {
// send only requested.
// await log("Have range query")
logger.info("Range query")
if (file.profile_picture != "Multipart") {
// await log('One File Query');
logger.info("One file range query")
const res2 = await fetch(
`https://ipfs.particle.network/${extractCID(file.profile_picture)}`,
{
method: "GET",
headers: {
Range: req.headers.range,
},
}
);
const blob = new Uint8Array(await res2.arrayBuffer());
// controller.enqueue(blob);
// controller.close();
logger.info("One file range query done")
res.write(blob);
res.end();
} else {
logger.info("Multiples file range query")
var content_range_start = start;
var content_range_end = end;
var current_range = 0;
// console.time("Cached hit")
file.data = await cache.cache(shared+"cids", async () => {
return file.data.map((cid) => extractCID(cid));
}, 60 * 60 * 24)
// console.timeEnd("Cached hit")
// console.time("Processing")
for (var cid of file.data) {
// Skip chunks completely outside the range
if (current_range + eachfilesize[cid] <= content_range_start) {
current_range += eachfilesize[cid];
continue;
}
// Calculate the starting byte within the current chunk
let start_byte = Math.max(content_range_start - current_range, 0);
// Calculate the number of bytes to fetch from this chunk
let bytes_to_fetch = Math.min(
eachfilesize[cid] - start_byte,
content_range_end - current_range
);
// Adjust the range for the fetch request
let fetch_start = start_byte;
let fetch_end = Math.min(
start_byte + bytes_to_fetch,
eachfilesize[cid]
);
await customFetch(
`https://ipfs.particle.network/${cid}`,
res,
fetch_start,
fetch_end
)
// Update the current range
current_range += bytes_to_fetch;
// Check if the end of the requested range has been reached
if (current_range >= content_range_end) {
// controller.close();
// res.end();
logger.warn("File size reach larger than content-range-end");
break;
}
}
logger.info("Multiples file range query done")
// controller.close();
res.end();
}
} else {
logger.info("No range query")
if (file.profile_picture != "Multipart") {
logger.info("Req one file")
// siez = await getFileSize([file.profile_picture]);
// log("Comsume data")
// const res2 = await fetch(
// `https://ipfs.particle.network/${extractCID(file.profile_picture)}`
// );
// const blob = new Uint8Array(await res2.arrayBuffer());
// res.write(blob);
await customFetch(
`https://ipfs.particle.network/${extractCID(file.profile_picture)}`,
res,
0,
eachfilesize[extractCID(file.profile_picture)]
)
logger.info("Req one file done")
res.end();
} else {
// siez = await getFileSize(file.data);
// console.log("Req full file");
logger.info("Req full file")
// console.time("Req full file")
file.data = await cache.cache(shared+"cids", async () => {
return file.data.map((cid) => extractCID(cid));
}, 60 * 60 * 24)
for (var cid of file.data) {
await customFetch(
`https://ipfs.particle.network/${cid}`,
res,
0,
eachfilesize[cid]
);
// controller.enqueue(blob);
}
// console.log("Done");
logger.info("Req full file done")
res.end();
// console.timeEnd("Req full file")
}
}
// log("Success")
// const tstream = new TransformStream();
// // ctx.waitUntil(readable.pipeTo(tstream.writable));
// await readable.pipeTo(tstream.writable)
// log("Sended!")
// if (req.headers.range) {
// res.status(206).send(readable);
// } else {
// res.status(200).send(readable);
// }
// tstream.readable.pipeTo(res);
// res.pipe(tstream.readable);
} else {
logger.warn("Request fail by missing parameters")
res.status(400).send("Missing shared or filename parameter");
}
});
app.listen(7860, "0.0.0.0",async () => {
// console.log("Server started on port 7860")
logger.info("Server started on port 7860")
});
}