Understanding streams in Node js

11/10/2020

I am stuck on an issue where I need to create and download a zip of multiple files using NodeJs. Things I have tried and failed :

https://github.com/archiverjs/node-archiver/issues/364#issuecomment-573508251

https://github.com/kubernetes/kubernetes/issues/90483#issue-606722433

https://stackoverflow.com/questions/60464596/unexpected-behavior-using-zip-stream-npm-on-google-k8s

In addition to this, now the files are encrypted as well so I have to decrypt them before adding them to zip on the fly.

Though I solved this also, my solution works perfectly while the server is running on a local machine but failed when on the Google Kubernetes Engine.

After some more research, I guess this might be because of a backpressure issue in the streams in NodeJs but as described in docs, backpressure is handled by the pipe method automatically. Is it possible that the receiving speed of the browser is not matching with the sending speed of my server/zipping if yes how to solve this problem?

All the samples related to the problem are in the links provided above.

In Addition to this readable stream is passed through decipher to decrypt it.

    const handleEntries = ({ elem, uniqueFiles, archive, speedLimit }) => {
  return new Promise((resolve, reject) => {
    let fileName = elem.fileName;
    const url = elem.url;
    const decipher = elem.decipher;
    // changing fileName if same filename is already added to zip
    if (uniqueFiles[fileName] || uniqueFiles[fileName] === 0) {
      uniqueFiles[fileName]++;
    } else {
      uniqueFiles[fileName] = 0;
    }
    if (uniqueFiles[fileName]) {
      const lastDotIndex = fileName.lastIndexOf(".");
      const name = fileName.substring(0, lastDotIndex);
      const extension = fileName.substring(lastDotIndex + 1);
      fileName = `${name}(${uniqueFiles[fileName]}).${extension}`;
    }
    let readableStream = Request(url);
    // create a "Throttle" instance that reads at speedLimit bps
    if (speedLimit) {
      const throttle = new Throttle({ bps: Number(speedLimit) });
      readableStream = readableStream.pipe(throttle);
    }
    // if file is encrypted, need to decrypt it before piping to zip
    readableStream = decipher ? readableStream.pipe(decipher) : readableStream;
    archive.append(readableStream, { name: fileName });
    readableStream.on("complete", result => {
      console.log("Request stream event complete : ", fileName);
      resolve("done");
      // readableStream.unpipe();
      // readableStream.destroy();
    });
    readableStream
      .on("error", error => {
        console.log("Request stream event error fileName : ", fileName, " error : ", error);
        // readableStream.unpipe();
        // readableStream.destroy();
        resolve("done");
      })
      .on("pipe", result => {
        console.log("Request stream event pipe : ", fileName);
      })
      .on("request", result => {
        console.log("Request stream event request : ", fileName);
      })
      .on("response", result => {
        console.log("Request stream event response : ", fileName);
      })
      .on("socket", result => {
        result.setKeepAlive(true);
        console.log("Request stream event socket : ", fileName);
      });
  });
};

const useArchiver = async ({ resp, urls, speedLimit }) => {
  resp.writeHead(200, {
    "Content-Type": "application/zip",
    "Content-Disposition": `attachment; filename="${outputFileName}"`,
    "Access-Control-Allow-Origin": "*",
    "Access-Control-Allow-Methods": "GET, POST, OPTIONS"
  });
  const uniqueFiles = {};
  const archive = Archiver("zip", { zlib: 0 });
  archive.pipe(resp);
  archive
    .on("close", result => {
      console.log("archive stream event close : ", result);
      // archive.unpipe();
      // archive.destroy();
    })
    .on("drain", result => {
      console.log("archive stream event drain : ", result);
    })
    .on("entry", result => {
      console.log("archive stream event entry : ", result.stats);
    })
    .on("error", error => {
      console.log("archive stream event error : ", error);
      reject("error");
      // archive.unpipe();
      // archive.destroy();
    })
    .on("finish", result => {
      console.log("archive stream event finish : ", result);
      // archive.unpipe();
      // archive.destroy();
    })
    .on("pipe", result => {
      console.log("archive stream event pipe : ");
    })
    .on("progress", async result => {
      console.log("archive stream event progress : ", result.entries);
      if (urls.length === result.entries.total && urls.length === result.entries.processed) {
        await archive.finalize();
        console.log("finalized : ", urls[0]);
      }
    })
    .on("unpipe", result => {
      console.log("archive stream event unpipe : ");
    })
    .on("warning", result => {
      console.log("archive stream event warning : ", result);
    });
  for (const elem of urls) {
    await handleEntries({ elem, uniqueFiles, archive, speedLimit });
  }
};

I tried this code with archiver, getting drain event of archiver while zipping large files, does pipe handles back pressure or not if yes why I am getting drain event from archiver.

-- Raghu Chahar
archiverjs
kubernetes
node.js
zipstream

2 Answers

11/14/2020

Hey) I've researched your code and can say you, you use a promise function and you're not waiting until she finish. you need to wrap zipStreamer.entry with await new Promise(). And must be like this

async function doSmth() {
   const decipher = crypto.createDecipheriv(
      algorithm,
      Buffer.from(key), 
      Buffer.from(key.substring(0, 9)
   ));
   await new Promise((resolve, reject) => {
     zipStreamer.entry(readableStream.pipe(decipher), {
      name: fileName
     }, (error, result) => {
      if (!error) {
         resolve("done");
      } else {
        reject("error");
      }
     });  
   });
}
-- Pavel
Source: StackOverflow

11/20/2020

It seems that I got the solution for this, I did a few changes in Kubernetes configurations i.e increase timeout from 30 secs to 300 secs, increase CPU limit, and tested it multiple times for up to 12-13 GB file and it works like a charm. I think increasing CPU from .5 to 1 and increasing timeout did the job for me.

-- Raghu Chahar
Source: StackOverflow