現在位置: ホーム / ビッグデータ ブログ / TreasureDataのBulkImportによるバッチ処理

TreasureDataのBulkImportによるバッチ処理

TreasureDataでのBulkImportのTipsやスクリプトを作る際の考慮点についての紹介となっています。QiitaのTreasureData Advent Calendarの7日目の記事です。

はじめに

 こんにちは、高橋です。TreasureDataでは、ストリーミングのデータに対してはtd-agentを用いてアップロードを行い、過去データに対してはコマンドラインツールのBulkImportを用いてアップロードを行う方法が一般的です。そのため、短いスパンでファイルが追加されていくようなデータに対しては、td-agentの利用が望ましいのですが、他製品との組み合わせによってはtd-agentを使うことが難しい場合もあります。この場合、FTPサーバやS3などにあるファイルを日・時間単位で定期的にアップロードするために、Bulkimportを使ったシェルスクリプトを作成しバッチで処理する必要があります。
 そこで、今回はBulkimportをスクリプトを作成する際のエラーハンドリングを含めたコツを紹介していきたいと思います。

なぜエラーハンドリングを行うのか?

 そもそも、BulkImportが全てのエラーハンドリングも行ってくれれば簡単なのかもしれません。しかし、データアップロードを行う要件によって、データの内容も更新頻度もいつまでにデータをTreasureData上のクエリ対象となるべきなのか、という最低条件が異なります。そのため、エラーが発生した際にアラートを鳴らし、マニュアルで何らかの対応が必要なのか、次の回のバッチ処理でリカバリできれば良いのかなどを個々で検討する必要があるためです。

BulkImportの処理

 Bulkimport自体の処理は下図の処理フローによって実現されます。

2014-12-22_bulkimportPhases

引用: http://docs.treasure-data.com/articles/bulk-import-internal

Step 0. Create

 BulkImportでは初めにセッションの作成を行います。このセッション名は、名前はユニークになるように設定します。また、定期的に行うBulkImportであれば、日付などを用いると良いです。BulkImportは、このセッションを元にBulkImport用ストレージの処理の状態を扱い、トランザクションを実現します。

$ td import:create <session_name> <db> <table>

 ここでの注意点としては、createのPostメソッドが発行された際に、tdコマンドではリトライがされません。そのため、例えばネットワークに問題があり、エラーメッセージは表示されたが、実際にはセッションが作成されているという状態が発生することがあり得ます。

 そのため、スクリプト内には、リトライ処理としてcreateが失敗した場合には、step5のDeleteを行った後に再度リトライを行う必要があります。 

Step 1. Prepare

 アップロードしたいファイルを、ローカルでファイルをmsgpack形式に変換し、GZIPで圧縮し、ファイルアップロードを効率的に行うための変換処理を行います。ここでは効率的な圧縮のために、スキーマを指定しますが、アップロード後のデータとはあまり関係ないので、Prepare用の型指定だと思っておいてください。
 Prepareをする際には、いろいろオプションがありますが、下記のケースがよくあるので、覚えておくと良いでしょう。

    • ヘッダーがないケース
      • --columnsオプションを付与して、カラムの順番で、カラム名を記述します。英数字小文字にしておきましょう。
      • $ td import:prepare logs/*.csv --format csv --columns time,uid,price,count --time-column time -o parts/
      • ヘッダーがあるけど、別なカラム名を使いたいケース
        • --columnsオプションを利用すると、一行目のヘッダーを読み込んでしまうため、--column-headerも合わせて使いましょう。--column-headerを使うことで1行目のヘッダーを無視しつつ、--column-typesのカラム名を利用することができます。
        • $ td import:prepare logs/*.csv --format csv --columns time,uid,price,count --column-header --time-column time -o parts/
        • データにtimeカラム(unixtime)がないケース
          • --time-value (unixtime)オプションを付与することで、任意のtimeを付与させることができます。注意点として、TreasureDataはtimeを1時間ごとにパーティションを区切って効率的なデータ処理を行うので、全部のデータを--time-value 0にするとクエリの効率などが悪くなります。そのため、データの更新日などでtimeの範囲がわかるのであれば、--time-value UNIXTIME,HOURSのように指定することで、ある時間からHOURS時間の範囲でtimeをばらけてデータを挿入してくれます。
          • 下記の例では、2014/12/22 00:00:00 <= time <= 2014/12/22 23:00:00の範囲でばらけてtimeを付与してくれます(TD_TIME_RANGEでは、starttime <= time < endtimeとなってるので、注意が必要です)。
          • $ td import:prepare logs/*.csv --format csv --column-header --time-value 1419174000,23 -o parts/
        • データの日付のフォーマットがこんなケース
          • オプションで--time-formatを使います。
            • yyyyMMddHHmmss (例: 20141222230000) ちなみにこの場合、該当の数値列を数値だと認識して変換エラー起こすので、明示的にstringとしておく必要があります。
            • $ td import:prepare logs/*.csv --format csv --column-header --time-format '%Y%m%d%H%M%S' --time-column time -o parts/ --all-string
        • カラムに数値列内の文字列が入ってたりするケース
          • あるカラムの値がDBではNULLだったのをファイルに出力した時に、そのまま'NULL'で入ってたりすることがあります。TreasureDataに入れた後でいろいろ考えたらいいので、とりあえず--all-stringオプションをつけておきましょう。
        • 高速に変換したいケース
          • 処理速度自体は、処理用PCの性能に依存するため、同時に変換を行うファイル数を増やします。そのためには、--prepare-parallelオプションを付与します。これは、指定した数だけ同時にファイルを並列処理するため、ファイルの指定に*を利用する必要があります。
         補足として、圧縮されたファイルはsplit-sizeオプションで指定されたサイズ毎にローカルで分割されます。
        そのため、元のファイル数と圧縮されたファイル数は異なるので注意が必要です。また、定期的に処理を行う際には、ローカルの圧縮されたファイルを削除する必要が有ります。
         

        Step 2. Upload

         Prepareで圧縮したファイルをアップロードします。また、ここで指定したファイルがまだPrepareされてない場合は、Step1も同時に行います。また、--auto-createオプションを指定することで、Step0も同時に行えます。

         高速にアップロードを行いたい場合は、--parallelオプションを指定します。Prepareで変換されたファイルを並列にアップロードを行います。また、UploadでPrepareの処理も実行させることによって、Prepareしたファイルから順にアップロードを行っていくことができます。そのため、基本的にはStep2でStep1を兼ねて実行します。

         アップロードされたファイルは、td import:showで確認することができます。また、アップロードされたファイルの数と圧縮されたファイルの数が一致すると全てアップロードできたことが確認できます。その際の注意点として、Uploaded Partsに表示されているファイル名は、'.' が '_ 'に変換される仕様となっているため、標準出力をパースした際に、ローカルのファイルの名前と一致させる際のファイル名の変換が必要になります。

        $ td import:show test
        Name : test
        Database : test
        Table : test
        Status : Ready
        Frozen : true
        JobID : 27362
        Valid Records: 17742
        Error Records: 177
        Valid Parts : 1
        Error Parts : 0
        Uploaded Parts :
        tbl_test_tsv_0_msgpack_gz

        Step 3. Perform

         アップロードされたファイルでは、そのままではTreasureDataでは扱えないため、TreasureDataで扱えるようにMapReduce処理が実行されます。ここで通常のクエリと同様にJobIDが振られます。このPerform処理が完了するまでに--waitオプションを付与することで、Jobが完了するまで待機することができます。スクリプトの場合には、ここでAPIサーバへの通信にエラーが発生した場合に、リトライしてチェックをする仕組みを考えておく必要があります。

         余談ですが、コンソールにログイン後、Collect Data -> Bulk Loaderを押下することでtd import:listの画面が見れます。

        Step4. Commit

         Performまで行った段階で、何件レコードをインポートできるかなどのすべてのValidationが完了しています。このファイルをTreasureDataのクエリ可能なストレージにインポートする処理がCommit処理になっています。このCommitが実行されるまでは、TreasureDataにはデータが反映されないので、レコード件数などのチェックはここまでで行っておくとよいでしょう。実際にTreasureDataでどのように入ったかどうかは、テストテーブルを作成し、実際にクエリをかけてみるのがよいでしょう。

        Step5. Delete

         一連の処理が完了した後は、セッションを削除しましょう。

        Step α. Auto

         上記のStep0~5までをある程度自動で行ってくれるコマンドが、td import:autoコマンドとなっています。

        $ td import:auto <session name> <files...>

         一回で終わるようなアップロード処理の場合は、td import:autoを使うことで簡単にアップロードができると思います。

        Step β. おまけ

         BulkImportで何からのエラーが発生した場合には、標準出力とカレントディレクトリに出力されるtd-bulk-import.logを確認しましょう。また、td-bulk-import.logのログのローテーション条件などは、td-0.11.6/java/logging.propertiesにて指定できます。

        BulkImportバッチ処理システムの構成イメージ

        2014-12-22_system

         外部サービスから定期的や非同期にS3やFTPサーバにアップロードされることを考えます。この場合、どこまで外部サービスがこちらのディレクトリ構成で出力してくれるかを自由にすることは難しいですが、日付毎にフォルダ分けされていくとよいでしょう。また、命名規則を考えておくことで、BulkImportの処理が容易になります。下表のようにしておくと、BulkImportでのファイルパスが日付毎にまとめて行えます。

        $ td import:upload mysess 2014-12-22/*.csv.gz
        ディレクトリ構成例
        2014-12-22 2014-12-22-0100-file1.csv.gz
        2014-12-22-0130-file1.csv.gz
        2014-12-22-0130-file2.csv.gz
        2014-12-21 2014-12-21-2300-file1.csv.gz

        BulkImportのバッチ処理フロー

         BulkImportの処理フローを下図に記載します。下図を参照することで、スクリプトでどのフェーズでのBulkImportの処理でエラーハンドリングを考慮する必要があるかを示しています。

         各菱形の条件を判定するためには、td import:showにより、標準出力の内容をパースしてステータスチェックを行う必要があります。

         最後のFlag Uploaded Files on Storageの部分では、TreasureDataに該当のファイルをアップロードしたかどうかの判別を行うためのフラグを立てています。外部サービスからある1時間分のファイルが送られてきた後、さらにファイルの追加があり、別途アップロードを行う必要があるということはよく起ります。このときに、同一ディレクトリに対して、BulkImportで単純に*を付与すると、再度同じファイルをアップロードしてしまうため、アップロード完了後に完了済みのファイルに対してFlagを立てる処理が必要になります。簡単な方法としては、元のファイルをMoveしたり、Renameしてしまうのが簡単かと思います。

         2014-12-22_activiy

        おわりに

         BulkImportは、コマンドラインツールのために色々と自前で考えることはあります。td-agentやコンソールからのUploaderなども組み合わせて、TreasureDataにデータを貯めていき、データ分析が簡単に行えるようになると良いですね。


        タグ: