Node.jsでCSVを1行ずつ処理する(async generator)

時計アイコン公開日:2022/10/13
時計アイコン更新日:2022/10/13
最後まで読まないと結論が載ってない系の記事です

ExcelやCSVをプログラムで処理したいケースはよくあると思います。 みなさんはどうやって実装しますか?

人によってアプローチは異なると思いますが、

CSVといってもテキストファイルの処理なので、大抵の言語で実装できるでしょう。

CSVなので、行ごとにカンマで区切って処理すればいいだけです。

ファイルを一度全部読み込んでから行ごとにループ処理するアプローチがありますが、

メモリ効率が悪くファイルサイズが大きくなると対応できないため、

行ごとに順番に読み出していく処理が必要になります。

Pythonだと標準モジュールにCSVモジュールがあって、

普通にfor文で1行づつ処理できるのでサクッと書きたい時には重宝します。

今回はTypeScriptで同様の書き味ができるようにしてみたので紹介です。

Node.jsのreadline

Node.jsにはストリームを行単位で取り扱うことのできるreadLineモジュールが標準で提供されています。

https://nodejs.org/api/readline.html

Node.jsは非同期のイベント駆動なので、基本はイベントにコールバック処理を定義する形になります。

readlineのcreateInterfaceを使うことによって、lineという行の読み取りイベントを受け取ることができるので、

1行の処理はここに書くことができます。

↓こんな感じですね。

import fs from "fs";
import readline from "readline";

const rl = readline.createInterface({
  input: fs.createReadStream(path),
  crlfDelay: Infinity,
});

rl.on("line", (line) => {
  // 1行分のデータに対してなんかする
});

サクッと書くだけでいいならこれでいいのですが、

イベントのコールバックに処理を書くため、読み取り結果を外のスコープに返すときにちょっと面倒だったりします。

全部を読み取った後にPromiseで返すことはできますが、全行を保存するバッファを用意しないといけないので趣旨からは外れてしまいます。

async iterator

少し脱線します。

JavaScriptにはfor of という構文がありますよね。

for (const element of array) {}

arrayが配列の場合、elementには要素が順番に渡されるという仕様ですが、

Symbol.iteratorというメソッドが実装されていれば配列以外でもこの構文を使うことができます。

※配列やMapなどの反復可能なオブジェクトには標準で実装されています。

詳しくは以下が参考になります。

https://developer.mozilla.org/ja/docs/Web/JavaScript/Reference/Global_Objects/Symbol/iterator

https://ja.javascript.info/async-iterators-generators#ref-2055

そして、Symbol.asyncIteratorが実装されている場合は非同期に反復処理ができます。

つまり、for await of構文が使えます。

https://developer.mozilla.org/ja/docs/Web/JavaScript/Reference/Statements/for-await...of

https://ja.javascript.info/async-iterators-generators#ref-2056

for await of の構文はこんな感じです。

for await (const element of asyncArray) {}

非同期に反復可能なモノにたいして使うことができます。

使うケースが限られているのもあってイメージがつきにくいのですが、

readlineのcreateinterfaceasyncIterable(async iteratorが実装されている)なオブジェクトを返してくれるので、

このfor await ofを使うことができます。

※Node.js12とかからだった気がする

for await of を使って1行ずつ処理する

こんな感じです。

import fs from "fs";
import readline from "readline";

const rl = readline.createInterface({
  input: fs.createReadStream(path),
  crlfDelay: Infinity,
});

for await (line of rl) {
  // 1行分のデータに対してなんかする
}

ふむふむなるほど?

確かにfor await of でスッキリ書けるようになってるけど、

スコープ外にデータを持ち出せないのは変わってなくない?

そうです。ただfor await of で処理できるだけでは旨味がありません。

この処理をasync generatorで包むことで真価を発揮します。

async generatorな関数にする

https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Global_Objects/AsyncGenerator

https://ja.javascript.info/async-iterators-generators#ref-2058

その名の通り非同期なジェネレーターです。

詳しいことは省略しますが、ジェネレーター内ではyieldが使えるので、

任意のタイミングで値を返すことができます。

さきほどの処理をasync generatorで包むとこうなります。

const asyncReader = async function* (path: string): AsyncGenerator<string[]> {
  const rl = readline.createInterface({
    input: fs.createReadStream(path),
    crlfDelay: Infinity,
  });

  for await (const line of rl) {
    // 1行ごとの処理
    const result = line.split(",")
    yield result;
  }
};

csvなのでカンマ区切りのデータを配列に分解してから呼び出し元に戻しています。

使用側はこうなります。

const reader = asyncReader(path)

// next()で1行ずつ読み出し
(await reader.next()).value;

// for await of も使える
for await (const line of reader) {
  console.log(line[0])
};

ただのサンプルですが、CSVを読み取るプログラムとデータ処理を分離することができましたね。

yieldで行ごとにデータを返しているため、バッファで全体を保持する必要がなくメモリ効率がよくなります。

※1行分のデータが重い場合はこの方法では解決できませんが、、

CSVの改行を正しく扱えない問題

1行ずつデータを処理できるようになりましたが、

readlineの行イベントをそのまま処理しているため、CSVのデータ内(セル内)に改行があった場合対処できません。

CSVの仕様としては、ダブルクォーテーションで囲われた文字列内の改行がデータ内の改行として扱われる仕様なので、

読み取るプログラムを書く場合はこれを考慮する必要があります。

readlineはCSVに特化したものでもないので、自分で処理を書くことになります。

さきほどの、async generator関数です。

yieldしてしまうと呼び出し元にデータが返ってしまうため、

それより前段で判定して次の行データを取得する必要があります。

const asyncReader = async function* (path: string): AsyncGenerator<string[]> {
  const rl = readline.createInterface({
    input: fs.createReadStream(path),
    crlfDelay: Infinity,
  });

  for await (const line of rl) {
    if (データ内改行がある場合) {
        // 次の行も取得してまとめたい
    }
    yield line;
  }
};

async generator関数で使える**next()**を使いたくなりますが、

readlineのcreateInterfaceはジェネレーターではないので残念ながら使えません。

  for await (let line of rl) {
    if (データ内改行がある場合) {
        // next()で次の行を取得したいが、できない
        line += await rl.next()
    }
    yield line;
  }

そうだ!もう1つジェネレーターを挟もう!

私が思いついた解決策は1行分の文字列をyieldするだけのジェネレーターを作成することでした。

const asyncReader = async function* (path: string): AsyncGenerator<string> {
 const rl = readline.createInterface({
  input: fs.createReadStream(path),
  crlfDelay: Infinity,
 });

 for await (const line of rl) {
  yield line;
 }
};

そして、呼び出し元に返す関数を別で作成します

export const lineParser = async function* (path: string): AsyncGenerator<string[]> {
  const reader = asyncReader(path);

  for await (let line of reader) {
    while (isNeedNextLine(line)) {
   line = line + "\n" + (await reader.next()).value
    }

    yield values = line.split(",");
  }
};

const isNeedNextLine = (line: string): boolean => {
  const matched = line.match(/"/g);
  // クォーテーションなしはセル内改行なし
  if (!matched) {
    return false
  }

  // クォーテーションが奇数=セル内改行あり
  return matched.length % 2 === 1;
};

データ内に改行がある場合、対になっていないダブルクォーテーションが存在するので、

1行に含まれるダブルクォーテーションの数で判定しています。

読み取りをジェネレーターで包んだため**next()**が使えるようになったので、

改行が含まれる限りwhileで繋げるように処理を書いています。

これで、データ内の改行を考慮することができるようになりました。

※改行を繋げた結果の文章が長すぎるとメモリオーバーする可能性はありますが、、、

もっとTypeScriptっぽく書きたい。

先ほどの処理を使うと、読み取り処理はこうなります。

const parser = lineParser(path)

for await (const line of parser) {
  console.log(line[0])
};

カンマ区切りで配列にしていたので、個々の値を取得する場合は要素を指定する必要があります。

文字列であるということがわかっているだけで、全くTypeScriptみを感じませんね。

少し手を加えてTypeScriptっぽくしていきます。

まず、カラム定義ファイルを用意します。

大抵の場合、読み取りたいファイルのカラム定義は決まっていると思います。

1列目にID、2列目に名前、といった感じですね。

決まっているのであれば、TypeScriptのファイルで定義してみましょう。

サンプルのCSVはこちらです

fire water grass
Charmander Squirtle Bulbasaur
Charmeleon Wartortle Ivysaur
Chalizard Blastoise Venusaur

このCSVに対し、どの列にどんな値が入っているかを定義したいので、 カラム定義のコードははこうなります。

const StarterPokemonColumn = {
  fire: 0,
  water: 1,
  grass: 2,
} as const;

これを先ほどのlineParserに渡せるように変更します。

type LineObject<T> = Record<keyof T, string>;

export const lineParser = async function* <T extends { [key: string]: number }>(
 path: string,
 columns: T,
 useHeader = false
): AsyncGenerator<LineObject<T>> {
 const reader = asyncReader(path);

 if (!useHeader) {
  await reader.next();
 }

 for await (let line of reader) {
  while (isNeedNextLine(line)) {
   line = line + "\n" + (await reader.next()).value
  }

  const values = line.split(",");

  // 渡されたカラム定義に従ってオブジェクトを作成する
  const lineObject = Object.entries(columns).reduce(
   (obj, [key, position]) => ({ ...obj, [key]: values[position] }),
   {} as LineObject<T>
  );

  yield lineObject;
 }
};

呼び出し側はこのように書くことができます。

const parser = lineParser(path)

for await (const line of parser) {
  // lineに対し、補完が効くようになる
  console.log(`noEvolved fire:` ${line.fire})
}

line[0]とやっていることは変わりませんが、
どの列が何の値かを常に参照しながらプログラムを書くのはしんどい上に、
ミスがあっても実行時まで気づけないという問題があります。
このように自分でカラム定義を書くことで、TypeScriptっぽい書き味になりました。

まとめ

以上、TypeScript(Node.js)でCSVを1行ずつ処理する、でした。
他の人の「自分ならこうする!」を聞いてみたいなと思います。

以下、最終的なコードです。

import fs from "fs";
import readline from "readline";

export type LineObject<T> = Record<keyof T, string>;

const asyncReader = async function* (path: string): AsyncGenerator<string> {
 const rl = readline.createInterface({
  input: fs.createReadStream(path),
  crlfDelay: Infinity,
 });

 for await (const line of rl) {
  yield line;
 }
};

export const lineParser = async function* <T extends { [key: string]: number }>(
 path: string,
 columns: T,
 useHeader = false
): AsyncGenerator<LineObject<T>> {
 const reader = asyncReader(path);

 if (!useHeader) {
  await reader.next();
 }

 for await (let line of reader) {
  while (isNeedNextLine(line)) {
   line = line + "\n" + (await reader.next()).value
  }

  const values = line.split(",");
  const lineObject = Object.entries(columns).reduce(
   (obj, [key, position]) => ({ ...obj, [key]: values[position] }),
   {} as LineObject<T>
  );

  yield lineObject;
 }
};

const isNeedNextLine = (line: string): boolean => {
 const matched = line.match(/"/g);
 // クォーテーションなしはセル内改行なし
 if (!matched) {
  return false
 }

 // クォーテーションが奇数=セル内改行あり
 return matched.length % 2 === 1;
}