Apache Beam with Google Cloud Dataflow(over 2.0.x系)入門~Combine~

Apache Beam with Google Cloud Dataflow(over 2.0.x系)入門~Combine~

Apache Beamの5つのCore Transformの内の1つ、Combineの基本的な使い方について記す。
他のCore TransformやそもそものApache Beam 2.0.xの基本的な話は以下に記述している。

IntelliJとGradleで始めるApache Beam 2.0.x with Google Cloud Dataflow - Qiita

Apache Beam with Cloud Dataflow(over 2.0.0系)入門~基本部分~ParDoまで~ - Qiita

Apache Beam with Google Cloud Dataflow(over 2.0.x系)入門~基本的なGroupByKey編~ - Qiita

なお、本記事は以下の2つの公式ドキュメントを参考に記述している。

Beam Programming Guide

コレクションと値の結合 | Cloud Dataflow のドキュメント | Google Cloud Platform

Combineの2つの役割

Combineは、PCollection内に存在する各要素(各データ)を結合したり、マージする。
Map/Shuffle/ReduceでいうところのReduceのようなものだと認識している。

Combineの仕方は大きく分けて2つ存在する。
「1つのPCollection内に存在する要素を結合して、1つの値を生成する方法」と「KeyによってGroup化されたPCollectionのValue部分の各要素を結合して、1つの値を生成する方法」である。
以下、各々の方法を記したい。

1つのPCollection内に存在する要素を結合して、1つの値を生成する方法

1つのPCollection内に存在する要素を結合して、1つの値を生成する方法とは

PCollection内の各要素を結合する。
=>これはParDoとの違いに注意する必要がある。
ParDoは、PCollection内の各々の要素に対して何らかの処理を行う。
Combineは、PCollection内の各要素を結合する。

例えば、PCollection内に存在する要素を結合して、1つの値を生成する場合がこれ。

PCollection<Integer> sum = pCollection.apply(Sum.integersGlobally());

一見、Combineが存在しないように見えるが、Sum.integersGlobally()が、 Combine.globallyをwrapしている。実際のSum.integersGlobally()は以下。

public static Combine.Globally<Integer, Integer> integersGlobally() {
  return Combine.globally(Sum.ofIntegers());}

参考 API リファレンス

withoutDefaults()

空のPCollectionがinoutとして与えられた場合に、emptyを返したいなら withoutDefaults()をつける。

PCollection<Integer> sum = integerPCollection.apply(Sum.integersGlobally().withoutDefaults());

Global Windowの場合と非Global Windowの場合の動作の違い

Global Windowの場合には、1 つの項目を含んだ PCollection を返すことがデフォルトの動作になっている。

一方、非Global Windowの場合、上記のようなデフォルトの動作はしない。
Combineを使用する際に、Optionを指定する。
公式がわかりやすかったので、以下引用。(本投稿執筆時には、Apache Beam 2.0.xの方のDocumentにはまだこの記載が存在しなかったため、Google Cloud Dataflow1.9の方の公式ドキュメントから引用させていただいている)

.withoutDefaults を指定する。この場合、入力 PCollection 内の空のウィンドウは、出力>コレクションでも空になります。

.asSingletonView を指定する。この場合、出力は直ちに PCollectionView へと変換されます。これは、それぞれの空ウィンドウが副入力として使用される場合のデフォルト値になります。通常、このオプションは、パイプラインの Combine の結果が後にパイプライン内で副入力として使用される場合にのみ、使用する必要があります。

引用元 : コレクションと値の結合 | Cloud Dataflow のドキュメント | Google Cloud Platform

実際にコードを書いてみた

各処理は、コードにコメントとして記載している。
理解を優先するため、メソッドチェーンを極力使用していない。
そのため、冗長なコードになっている。

package com.company;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.PCollection;

/**
 * メインクラス
 */
public class Main {
    /**
     * 関数型オブジェクト
     * String => Integerの型変換を行う
     */
    static class TransformTypeFromStringToInteger extends DoFn<String, Integer> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            // 要素をString=>Integerに変換して、output
            c.output(Integer.parseInt(c.element()));
        }
    }

    /**
     * 関数型オブジェクト
     * Integer =>Stringの型変換を行う
     */
    static class TransformTypeFromIntegerToString extends DoFn<Integer, String> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            // 要素をString=>Integerに変換して、output
            System.out.println(c.element());
            c.output(String.valueOf(c.element()));
        }
    }


    /**
     * インプットデータのパス
     */
    private static final String INPUT_FILE_PATH = "./sample.txt";
    /**
     * アウトデータのパス
     */
    private static final String OUTPUT_FILE_PATH = "./result.txt";

    /**
     * 理解のためにメソッドチェーンは極力使用しない
     * そのため冗長な箇所がある
     * メインメソッド
     *
     * @param args
     */
    public static void main(String[] args) {
        // optionを指定して、Pipelineを生成する
        Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());

        System.out.println("a");
        // ファイルから読み込み
        PCollection<String> lines = pipeline.apply(TextIO.read().from(INPUT_FILE_PATH));
        // 読み込んだ各データをString => Integerに変換
        PCollection<Integer> integerPCollection = lines.apply(ParDo.of(new TransformTypeFromStringToInteger()));
        // Combine.GloballyでPCollectionの各要素を合計
        // 空のPCollectionの場合、emptyを返したいなら => PCollection<Integer> sum = integerPCollection.apply(Sum.integersGlobally().withoutDefaults());
        PCollection<Integer> sum = integerPCollection.apply(Sum.integersGlobally().withoutDefaults());
        // PCollection<Integer> sumをInteger => Stringに変換
        PCollection<String> sumString = sum.apply(ParDo.of(new TransformTypeFromIntegerToString()));
        // ファイルに書き込み
        sumString.apply(TextIO.write().to(OUTPUT_FILE_PATH));

        // 実行
        pipeline.run().waitUntilFinish();
    }
}

実施にコードを書いてみた(メソッドチェーンを使ったver)

だいぶすっきりした

package com.company;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;


/**
 * メインクラス
 */
public class Main {
    /**
     * 関数型オブジェクト
     * String => Integerの型変換を行う
     */
    static class TransformTypeFromStringToInteger extends DoFn<String, Integer> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            // 要素をString=>Integerに変換して、output
            c.output(Integer.parseInt(c.element()));
        }
    }

    /**
     * 関数型オブジェクト
     * Integer =>Stringの型変換を行う
     */
    static class TransformTypeFromIntegerToString extends DoFn<Integer, String> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            // 要素をString=>Integerに変換して、output
            System.out.println(c.element());
            c.output(String.valueOf(c.element()));
        }
    }


    /**
     * インプットデータのパス
     */
    private static final String INPUT_FILE_PATH = "./sample.txt";
    /**
     * アウトデータのパス
     */
    private static final String OUTPUT_FILE_PATH = "./result.txt";

    /**
     * メインメソッド
     *
     * @param args
     */
    public static void main(String[] args) {
        // Pipeline生成
        Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());

        // 処理部分
        pipeline.apply(TextIO.read().from(INPUT_FILE_PATH))
                .apply(ParDo.of(new TransformTypeFromStringToInteger()))
                .apply(Sum.integersGlobally().withoutDefaults())
                .apply(ParDo.of(new TransformTypeFromIntegerToString()))
                .apply(TextIO.write().to(OUTPUT_FILE_PATH));

        // 実行
        pipeline.run().waitUntilFinish();
    }
}

読み込んだファイル

1
2
3
4
5
6
7
8
9
10

実行結果

result.txt-00000-of-00001 が出力される
result.txt-00000-of-00001の中身は

55

やっていることは、

10
Σk
k = 1

みたいなもん。

PerKey

GroupByKeyを行うと K,V(IterableなCollection)になる。
例えば、以下のようになる。

Java [1, 2, 3]

CombineのPerKeyは、このK,V[IterableなCollection]のV[IterableなCollection]部分をKey毎に結合する。なので、例えば上記のGroupByKey後のK,V(IterableなCollection)をCombine PerKeyを行うと以下のようになる。

Java [6]

K,V(IterableなCollection)の,V(IterableなCollection)の要素がすべて結合された。

実際にコードを書いてみた

各処理は、コードにコメントとして記載している。
理解を優先するため、メソッドチェーンを極力使用していない。
そのため、冗長なコードになっている。

package com.company;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;

/**
 * メイン
 */
public class Main {
    /**
     * 関数オブジェクト
     * 与えられたString str, String numを","で分割し、
     * numをInteger型に変更して、KV<String, Integer>型にする
     */
    static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
        @ProcessElement
        // ProcessContextは、inputを表すobject
        // 自分で定義しなくてもBeam SDKが勝手に取ってきてくれる
        public void processElement(ProcessContext c) {
            // ","で分割
            String[] words = c.element().split(",");
            // 分割したword[0]をKに、words[1]をIntegerに変換してVにする
            c.output(KV.of(words[0], Integer.parseInt(words[1])));
        }
    }


    /**
     * 関数オブジェクト
     * KV<String, Iterable<Integer>型をString型に変更する
     */
    static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Integer>, String> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            // inputをString型に変換する
            c.output(String.valueOf(c.element()));

        }

    }


    /**
     * インプットデータのパス
     */
    private static final String INPUT_FILE_PATH = "./sample.txt";
    /**
     * アウトデータのパス
     */
    private static final String COMBINE_OUTPUT_FILE_PATH = "./src/main/resources/combine_result/result.csv";

    /**
     * メイン
     * 理解のため、メソッドチェーンを極力使用していない
     * そのため、冗長なコードになっている
     *
     * @param args 引数
     */
    public static void main(String[] args) {
        // まずPipelineに設定するOptionを作成する
        // 今回は、ローカルで起動するため、DirectRunnerを指定する
        // ローカルモードでは、DirectRunnerがすでにデフォルトになっているため、ランナーを設定する必要はない
        PipelineOptions options = PipelineOptionsFactory.create();

        // Optionを元にPipelineを生成する
        Pipeline pipeline = Pipeline.create(options);

        // inout dataを読み込んで、そこからPCollection(パイプライン内の一連のデータ)を作成する
        PCollection<String> lines = pipeline.apply(TextIO.read().from(INPUT_FILE_PATH));

        // 与えられたString str, String numを","で分割し、numをInteger型に変更して、KV<String, Integer>型にする
        PCollection<KV<String, Integer>> kvCounter = lines.apply(ParDo.of(new SplitWordsAndMakeKVFn()));

        // Combine PerKey は、オペレーションの一部として GroupByKey 変換を実行する
        PCollection<KV<String, Integer>> sumPerKey = kvCounter
                .apply(Sum.integersPerKey());
        
        // PCollectionをファイル出力可能な形に変換する
        PCollection<String> output = sumPerKey.apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()));

        // 書き込む
        output.apply(TextIO.write().to(COMBINE_OUTPUT_FILE_PATH));

        // run : PipeLine optionで指定したRunnerで実行
        // waitUntilFinish : PipeLineが終了するまで待って、最終的な状態を返す
        pipeline.run().waitUntilFinish();
    }


}

実施にコードを書いてみた(メソッドチェーンを使ったver)

だいぶすっきりした

package com.company;

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Sum;
import org.apache.beam.sdk.values.KV;

/**
 * メイン
 */
public class Main {
    /**
     * 関数オブジェクト
     * 与えられたString str, String numを","で分割し、
     * numをInteger型に変更して、KV<String, Integer>型にする
     */
    static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
        @ProcessElement
        // ProcessContextは、inputを表すobject
        // 自分で定義しなくてもBeam SDKが勝手に取ってきてくれる
        public void processElement(ProcessContext c) {
            // ","で分割
            String[] words = c.element().split(",");
            // 分割したword[0]をKに、words[1]をIntegerに変換してVにする
            c.output(KV.of(words[0], Integer.parseInt(words[1])));
        }
    }


    /**
     * 関数オブジェクト
     * KV<String, Iterable<Integer>型をString型に変更する
     */
    static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Integer>, String> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            // inputをString型に変換する
            c.output(String.valueOf(c.element()));

        }

    }


    /**
     * インプットデータのパス
     */
    private static final String INPUT_FILE_PATH = "./sample.txt";
    /**
     * アウトデータのパス
     */
    private static final String COMBINE_OUTPUT_FILE_PATH = "./src/main/resources/combine_result/result.csv";

    /**
     * メイン
     * @param args 引数
     */
    public static void main(String[] args) {
        Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());
        pipeline
                .apply(TextIO.read().from(INPUT_FILE_PATH))
                .apply(ParDo.of(new SplitWordsAndMakeKVFn()))
                .apply(Sum.integersPerKey())
                .apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()))
                .apply(TextIO.write().to(COMBINE_OUTPUT_FILE_PATH));
        pipeline.run().waitUntilFinish();
    }


}

読み込んだファイル

Java,1
Python,5
Go,1
Java,3
Java,2
Go,5
Python,2
Go,2
Go,9
Python,6

実行結果

以下の3つのファイルが生成される。
result.csv-00000-of-00003
result.csv-00001-of-00003
result.csv-00002-of-00003

それぞれのファイルの中身は、以下。
分散並列処理で処理が行われているので、どの内容がどのファイルに出力されるかは毎回ランダムである。

result.csv-00000-of-00003

KV{Python, 13}

result.csv-00001-of-00003

KV{Java, 6}

result.csv-00002-of-00003

KV{Go, 17}

関連記事

IntelliJとGradleで始めるApache Beam 2.0.x with Google Cloud Dataflow - Qiita

Apache Beam with Cloud Dataflow(over 2.0.0系)入門~基本部分~ParDoまで~ - Qiita

Apache Beam with Google Cloud Dataflow(over 2.0.x系)入門~基本的なGroupByKey編~ - Qiita

参考にさせていただいたサイト

Beam Programming Guide コレクションと値の結合 | Cloud Dataflow のドキュメント | Google Cloud Platform API リファレンス

Apache Beam with Google Cloud Dataflow(over 2.0.x系)入門~基本的なGroupByKey編~

Apache Beam with Google Cloud Dataflow(over 2.0.x系)入門~基本的なGroupByKey編~

Apache Beamの5つのCore Transformの内の1つ、GroupByKeyの基本的な使い方について記す。
CoGroupByKeyなどについては別の機会に書けたらなと思う。

Apache Beam や Cloud Dataflowの基本についてはこちら

公式のBeam Programming Guideを参考に書かせていただいている。

GroupByKeyとは

並列なreduction操作。
Map/Shuffle/Reduce-styleでいうところのShuffle。
GroupByKeyは、簡単に言うとその名の通り「KeyによってCollectionをGroup化する」Core Transform。
Keyは同じだが、valueが異なるペアが複数存在するKey-ValueなCollectionを結合して新しいCollectionを生成する。
共通なKeyを持っているデータを集約するのに役に立つ。

multimapとuni-map

multimap

例えば、Java, Python, GoというKeyがあったとする。
その複数の各Keyに各々Valueが数字で割り当てられている。
変換前のこのMapをmultimapという。

Java,1
Python,5
Go,1
Java,3
Java,2
Go,5
Python,2
Go,2
Go,9
Python,6

uni-map

上記のKey-ValueなmultimapのCollectionに対してGroupByKeyを適用すると以下のような結果が得られる。

Java [1, 6, 8]
Python [2, 7]
Go[7, 8]

変換後このMapをuni-mapと呼ぶ。 一意のJava, Python, GoというKeyに対して、数字のCollectionのMapが割り当てられている。

Beam SDK for Java特有のKey-Valueの表し方

Beam SDK for Javaでは、通常のJavaとは異なるKey-Valueの表し方をする。
KV<K, V>という型でkey-valueのオブジェクトを表す。

実際にコードを書いてみた

読み込むファイル

Java,1
Python,5
Go,1
Java,3
Java,2
Go,5
Python,2
Go,2
Go,9
Python,6

実際のJavaのコード

各処理は、コードにコメントとして記載している。
理解を優先するため、メソッドチェーンを極力使用していない。
そのため、冗長なコードになっている。

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollection;


/**
 * メイン
 * Created by sekiguchikai on 2017/07/12.
 */
public class Main {
    /**
     * 関数オブジェクト
     * 与えられたString str, String numを","で分割し、
     * numをInteger型に変更して、KV<String, Integer>型にする
     */
    static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
        @ProcessElement
        // ProcessContextは、inputを表すobject
        // 自分で定義しなくてもBeam SDKが勝手に取ってきてくれる
        public void processElement(ProcessContext c) {
            // ","で分割
            String[] words = c.element().split(",");
            // 分割したword[0]をKに、words[1]をIntegerに変換してVにする
            c.output(KV.of(words[0], Integer.parseInt(words[1])));
        }
    }

    /**
     * 関数オブジェクト
     * KV<String, Iterable<Integer>型をString型に変更する
     */
    static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Iterable<Integer>>, String> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            // inputをString型に変換する
            c.output(String.valueOf(c.element()));

        }

    }


    /**
     * インプットデータのパス
     */
    private static final String INPUT_FILE_PATH = "./sample.txt";
    /**
     * アウトデータのパス
     */
    private static final String OUTPUT_FILE_PATH = "./result.csv";

    /**
     * メイン
     * 理解のため、メソッドチェーンを極力使用していない
     * そのため、冗長なコードになっている
     *
     * @param args 引数
     */
    public static void main(String[] args) {
        // まずPipelineに設定するOptionを作成する
        // 今回は、ローカルで起動するため、DirectRunnerを指定する
        // ローカルモードでは、DirectRunnerがすでにデフォルトになっているため、ランナーを設定する必要はない
        PipelineOptions options = PipelineOptionsFactory.create();

        // Optionを元にPipelineを生成する
        Pipeline pipeline = Pipeline.create(options);

        // inout dataを読み込んで、そこからPCollection(パイプライン内の一連のデータ)を作成する
        PCollection<String> lines = pipeline.apply(TextIO.read().from(INPUT_FILE_PATH));

        // 与えられたString str, String numを","で分割し、numをInteger型に変更して、KV<String, Integer>型にする
        PCollection<KV<String, Integer>> kvCounter = lines.apply(ParDo.of(new SplitWordsAndMakeKVFn()));

        // GroupByKeyで、{Go, [2, 9, 1, 5]}のような形にする
               // GroupByKey.<K, V>create())でGroupByKey<K, V>を生成している
        PCollection<KV<String, Iterable<Integer>>> groupedWords = kvCounter.apply(
                GroupByKey.<String, Integer>create());

        // 出力のため、<KV<String, Iterable<Integer>>>型からString型に変換している
        PCollection<String> output = groupedWords.apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()));

        // 書き込む
        output.apply(TextIO.write().to(OUTPUT_FILE_PATH));

        // run : PipeLine optionで指定したRunnerで実行
        // waitUntilFinish : PipeLineが終了するまで待って、最終的な状態を返す
        pipeline.run().waitUntilFinish();
    }
}

ちなみにメソッドチェーンを使うとこんな感じ。
だいぶすっきりした。

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.TextIO;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.KV;


/**
 * メイン
 * Created by sekiguchikai on 2017/07/12.
 */
public class Main {
    /**
     * 関数オブジェクト
     * 与えられたString str, String numを","で分割し、
     * numをInteger型に変更して、KV<String, Integer>型にする
     */
    static class SplitWordsAndMakeKVFn extends DoFn<String, KV<String, Integer>> {
        @ProcessElement
        // ProcessContextは、inputを表すobject
        // 自分で定義しなくてもBeam SDKが勝手に取ってきてくれる
        public void processElement(ProcessContext c) {
            // ","で分割
            String[] words = c.element().split(",");
            // 分割したword[0]をKに、words[1]をIntegerに変換してVにする
            c.output(KV.of(words[0], Integer.parseInt(words[1])));
        }
    }

    /**
     * 関数オブジェクト
     * KV<String, Iterable<Integer>型をString型に変更する
     */
    static class TransTypeFromKVAndMakeStringFn extends DoFn<KV<String, Iterable<Integer>>, String> {
        @ProcessElement
        public void processElement(ProcessContext c) {
            // inputをString型に変換する
            c.output(String.valueOf(c.element()));

        }

    }


    /**
     * インプットデータのパス
     */
    private static final String INPUT_FILE_PATH = "./sample.txt";
    /**
     * アウトデータのパス
     */
    private static final String OUTPUT_FILE_PATH = "./result.csv";

    /**
     * メイン
     * 理解のため、メソッドチェーンを極力使用していない
     * そのため、冗長なコードになっている
     *
     * @param args 引数
     */
    public static void main(String[] args) {
        Pipeline pipeline = Pipeline.create(PipelineOptionsFactory.create());

        // メソッドチェーンを使った書き方
        pipeline.apply(TextIO.read().from(INPUT_FILE_PATH))
                .apply(ParDo.of(new SplitWordsAndMakeKVFn()))
                .apply(GroupByKey.<String, Integer>create())
                .apply(ParDo.of(new TransTypeFromKVAndMakeStringFn()))
                .apply(TextIO.write().to(OUTPUT_FILE_PATH));

        // run : PipeLine optionで指定したRunnerで実行
        // waitUntilFinish : PipeLineが終了するまで待って、最終的な状態を返す
        pipeline.run().waitUntilFinish();
    }
}

実行結果

以下の3つのファイルが生成される。
result.csv-00000-of-00003
result.csv-00001-of-00003
result.csv-00002-of-00003

それぞれのファイルの中身は、以下。
分散並列処理で処理が行われているので、中身が空白のファイルや、中身が1つ、2つのものがあったりとバラバラである。
また、どの内容がどのファイルに出力されるかは毎回ランダムである。

result.csv-00000-of-00003
中身なし

result.csv-00001-of-00003

KV{Java, [1, 3, 2]}

result.csv-00002-of-00003

KV{Go, [5, 2, 9, 1]}
KV{Python, [5, 2, 6]}

関連記事

Apache Beam with Cloud Dataflow(over 2.0.0系)入門~基本部分~ParDoまで~ - Qiita

IntelliJとGradleで始めるApache Beam 2.0.x with Google Cloud Dataflow - Qiita

参考にさせていただいたサイト

Beam Programming Guide

GroupByKey と結合  |  Cloud Dataflow のドキュメント  |  Google Cloud Platform

※ Qiitaでも同一の投稿を行っている

Circle CI 2.0の基礎的な設定まとめてみた(GAE/Goのサンプル付き)

Circle CI 2.0の基礎的な設定まとめてみた(GAE/Goのサンプル付き)

今回の記事について

Circle CI2.0用の設定の基礎的な部分のメモ。

今回の記事は、単純な設定をCircle CI 2.0 で行うことを目的としているため、基礎的な設定のみを行い、Workflow等の設定は行わない。
※ Workflowの記事は別で書きたいと思っている。
また、サンプルとして、GAE/Goの設定を行う。
別にCircle CI1.0のことを知らなくても読めるはず。
Circle CI 1.0とCircle CI 2.0の機能的な違いなどについては、ここでは行わないので、以下を参考にするといいと思われる。

CircleCI 2.0に移行して新機能を活用したらCIの実行時間が半分になった話 - クラウドワークス エンジニアブログ

CircleCI 2.0を使うようにするだけで、こんなに速くなるとは夢にも思わなかった! | Tokyo Otaku Mode Blog

基本的な設定の項目はまとめるが、全部ではないので足りないところについては公式ドキュメントを参照されたい。

公式ドキュメントをかなり参考にさせていただいた。

実際の設定

まずは、実際の設定ファイルを見ていく方がわかりやすいと思うので、まずは設定ファイルを以下に記述する。

version: 2 # バージョン2を指定する
jobs:
 build: # Goのbuildとテストを行う
  docker: # ベースとなるDocker imageを指定
   - image: circleci/golang:1.8 # Dockefileのパスを指定(Go1.8を指定)
  environment:
   TZ: /usr/share/zoneinfo/Asia/Tokyo # Time Zoneを指定
  working_directory: /home/circleci/go/src/project # コード実行場所 以下のstepsはworking_directoryで実行される
  steps: # ローカルでも必要なものはshell scriptと言う感じで行う
   - checkout # working_directoryにcheckout
   - run: # command lineのプログラムを発動させる
      name: Set PATH to .bashrc. # runには名前をつけることができる
      command: | # 実際のコマンド 複数行に場合は、 `|` をつける
       echo 'export PATH=$HOME/go/bin:$HOME/go_appengine:$PATH' >> $BASH_ENV  # $BASH_ENVはデフォルトで入っている
       source /home/circleci/.bashrc
   - run:
      name: Make GOPATH directory. # GOPATHを指定するディレクトリを作成
      command: mkdir -p $HOME/go/src
   - run:
      name: Set GOPATH to .bashrc. # .bashrcにGOPATHを追加
      command: |
       echo 'export GOPATH=$HOME/go' >> $BASH_ENV  # $BASH_ENVはデフォルトで入っている
       source /home/circleci/.bashrc
   - run:
      name: Install appengine sdk. # appengine SDKをインストール
      command: |
       wget https://storage.googleapis.com/appengine-sdks/featured/go_appengine_sdk_linux_amd64-1.9.58.zip
       unzip go_appengine_sdk_linux_amd64-1.9.58.zip -d $HOME
   - run:
      name: Execute setup. # ここでセットアップ用のshellを実行
      command: PATH/TO/setup.sh
   - run:
      name: Run Server Unit Tests. # ここでユニットテストを実行するshellを実行
      command: PATH/TO/test.sh

Circle CI 2.0の各項目の説明

設定ファイル

Circle CI 2.0では、設定ファイルを従来の circle.yml ではなく、 .circleci/config.yml に記述することになっている。

version

Circle CI 2.0を使用する場合は、versionと言うkeyを .circleci/config.yml の先頭に記述し、valueとして、2を指定する。

jobs

mapで表現する。以降で説明する各jobの集合。

docker

dockerの設定を行う。
色々な項目があり、全部は記述しないので、その他の項目や詳しいことは公式ドキュメントを参照されたい。

image

Circle CIで使用するCustom Docker Image。
複数指定することが出来るが、最初に設定したDocker ImageがDocker executorを使用して、各jobを実行するのに使用されるPrimary Containerとなる。

environment

各変数はここで宣言する。

TZ

Docker imageのTime Zoneは、environmentにTZと言う変数で指定する。

working_directory

後述するstepsを実行する場所を指定する。

branches

Git hubなどのbranchのルールを指定する。
onlyとignoreを指定することができる。
onlyとignoreが同一 .circleci/config.yml に存在する場合に関しては、ignoreのみが適用される。

only

onlyを宣言した後のbranch名のリストにあるbranchのみCircle CIが実行されるようになる。

ignore

ignoreを宣言した後のbranch名のリストにあるbranchを無視してCircle CIが実行されるようになる。

Workflowを使用する場合は、個別のjobにbranchを記述しない。

steps

実行されるstepのリスト。
key/valueのmapで表現する。
色々な項目があり、全部は記述しないので、その他の項目や詳しいことは公式ドキュメントを参照されたい。

checkout

設定されたpathにソースコードをcheck outする。
デフォルトでは、 working_directory がそのpathになる。

run

command lineプログラムを実行する。
以下のような方法が基本的な書き方。

-run 
  command: コマンド

以下のように name としてcommandに名前をつけることもできる。

-run 
    name:
  command: コマンド

この場合は、CircleCI UIで表示される時にcommandがこの名前で表示される。
そうじゃない場合はフルのcommandが表示れるので、 name をつけた方がわかりやすくていいと思う。

複数行に渡る command| をつけて、以下のように記述する。

-run 
    name:
  command: |
  コマンド1行目
  コマンド2行目

その他の注意点

Circle CI 1.0では、 environment に指定すればCircle CIがよしなにやってくれていたが、Circle CI 2.0ではそうはいかない。
Circle CI 2.0では、自分で command で、.bashrc に設定する必要がある。
サンプルにある BASH_ENV というのはCircle CI 2.0がデフォルトでexportしているため、自前でenvironmentで宣言する必要はない。
以下を参考にさせていただいた。
How to add a path to PATH in Circle 2.0? - CircleCI 2.0 / 2.0 Support - CircleCI Community Discussion

GAE/Goのサンプルの流れの説明

GAE/Goの流れは以下のようになっている。(stepsの部分のみ説明)
1. .bashrcにPATHを設定する
2. GOPATH用のディレクトリを作成する
3. 作成したGOPATH用のディレクトリを.bashrcにGOPATHとして設定
4. appengine SDKをインストールして、解凍
5. その他の設定用のshellを実行
6. テスト用のshellを実行

Circle CIから別の環境に乗り換えることも考慮して、appengine SDK以外の設定に関しては、shellで行うようにしている。

所感

書きやすくなった。

参考にさせていただいたサイト

公式ドキュメント

Circle CI2.0

circleci2.0でおもにgoをCIする

CircleCI 2.0に移行して新機能を活用したらCIの実行時間が半分になった話 - クラウドワークス エンジニアブログ

How to add a path to PATH in Circle 2.0? - CircleCI 2.0 / 2.0 Support - CircleCI Community Discussion

CircleCI 2.0を使うようにするだけで、こんなに速くなるとは夢にも思わなかった! | Tokyo Otaku Mode Blog

Qiitaでも同一の投稿を行っている。

CSSの設計方法をまとめてみた~SUIT CCS編~(Angularによるサンプル付き)

CSSの設計方法をまとめてみた~SUIT CCS編~

SUIT CSSについて、簡単にまとめる。
また、Angularを使用した簡単なサンプルも書いてみる。
なお、本記事は、公式ドキュメントをかなり参考にさせていただいている。

詳細な部分に関しては以下の記事がわかりやすかったので、参考にされると良いと思う。
suit/design-principles.md at master · suitcss/suit suit/naming-conventions.md at master · suitcss/suit

SUIT CSSとは

Componentベースのcssの方法論。
Componentベースなので、AngularやReactなどのComponent指向なJavaScriptフレームワークやライブラリと相性が良いそう。

SUIT CSSのメリット

各々のユニットの結びつきを緩くして、独立したものにすることができる。

SUIT CSS の設計原則

ここで使用している英文は以下から引用させていただいている。
suit/design-principles.md at master · suitcss/suit

Modularity(モジュール性)

Each component should have a single focus and contain everything necessary to realise a specific part of the UI.

【意訳】
各Componentは、1つの事柄に集中して、また、UIの特定のパーツを実現するのに必要なすべてのものを含んでいるべきである。

Componentは、HTML、CSSJavaScriptを含んでいる。

=> AngularのComponentのようなもの。

Cohesion(結束)

The functionality and presentation defined by a component must be semantically related.
Components do not have direct influence over each other.

【意訳】
Componentによって、定義される機能性と表現は、意味的に結びついていなければならない。
Componentは、違いに直接的な影響を与えない。

Composable and configurable(組み立て可能で、設定可能である)

必要に応じて、Componentを組み合わせ可能であること。

Configuration is done via interfaces that are provided and used by components.

【意訳】
Componentによって提供され、利用されるinterfaceを通して、設定が行われる。

Loose coupling(弱い結びつき)

Component間の依存関係は、直接Component同士であれこれするのではなく、interfaceとeventでどうにかしましょう。

Soft encapsulation(ソフトなカプセル化)

Componentを他のComponentからカプセル化して、他のComponentから直接Component内のコードを利用できなくする必要がある。

Documentation(文書化)

各Componentの役割やCSSプロパティがなぜ必要かを丁寧に文書化する。

SUIT CSS命名規則

SUIT CSSでは、意味のあるハイフンと構造化したclassを使用する。
SUIT CSSでは、大きく分けて、UtilitiesとComponentsと概念が存在する。
繰り返し使うようなCSSのプロパティに対して、適用すると何回もCSSを記述しなくても良くなる。

Utilities

Utilitiesは、低レベルの構造的また位置の特徴を表す。
Componentの中で、どの要素にも直接適用できる。
繰り返し使うようなCSSのプロパティに対して、適用すると何回もCSSを記述しなくても良くなる。

記述方法

基本的には u-utilityName で記述する。
utilityNamecamel caseで記述する。
レスポンシブデザインを考慮した書き方もでき、その場合は u-size-utilityName で記述する。
sizeは sm = small、 md = medium、 lg = large Mediaのいずれかを適用する。

記述例

以下のように記述する。

<p class="u-floatLeft">hoge</p>

Components

記述方法

namespace-ComponentName-descendentName--modifierName というふうに記述する。

namespace(任意)

必要に応じて、namespaceというプレフィックスをつけることができる。
これによって、自作の部分とライブラリの区別がつきやすくなったりする。
<div class="namespace-Component">hoge<div> という形で記述する。
.namespace-Component{...} という形でstyleを適用する。

ComponentName

Componentの名前。
pascal case(アッパーキャメルケース)で記述する。
<div class="ComponentName">hoge<div> という形で記述する。
.ComponentName{...} という形でstyleを適用する。

ComponentName–modifierName

ベースとなるComponentの表現を変更するためのclass。
camel caseで記述する。
ComponentName–modifierNameは、ベースとなるに追加してHTML上に書く。
<div class="ComponentName ComponentName--modifierName">hoge<div> という形で記述する。
以下の形でstyleを適用する。

.ComponentName{...}
.ComponentName--modifierName{...}
ComponentName-descendentName

Componentの子要素のようなもの。
Componentの子要素に直接、cssの表現を適用する。
<div class="ComponentName-descendentName">hoge<div> という形で記述する。
.ComponentName-descendentName{...} という形でstyleを適用する。

ComponentName.is-stateOfComponent

Componentの状態の変更を反映するのに使用する。
camel caseで記述する。
state nameはいろんなところで使い回すけども、各Componentによってstateに当てるstyleは異なるので、直接このstate nameにstyleを当ててはいけない。
<div class="ComponentName is-stateOfComponent">hoge<div> という形で記述する。
以下の形でstyleを適用する。

.Compomnent{}
.Compomnent.is-stateOfComponent{}

という感じでstyleを適用する。

実際に書いてみた

実際にComponent指向のjavaScriptフレームワークであるAngularで実装してみた。

実際のコード

app.component.ts

import {Component} from '@angular/core';

@Component({
  selector: 'app-root',
  templateUrl: './app.component.html',
  styleUrls: ['./app.component.scss', './utilities.scss']
})
export class AppComponent {
  contentsTitle = 'SUIT CSS!';
  contentsHeading = 'design principles';
  designPrinciples = ['Modularity', 'Cohesion', 'Composition and configuration', 'Loose coupling', 'Soft encapsulation', 'Documentation'];
  clazz = 'AppComponent is-coloredBlack';

  /**
   * 文字を赤くする
   */
  changeColorToRed() {
    this.clazz = 'AppComponent is-coloredRed';
  }

}

app.component.html

<!--Component Name-->
<article class="AppComponent">
  <!--ComponentName-descendentName-->
  <header class="AppComponent-header">
    <h1>{{contentsTitle}}</h1>
  </header>
  <!--ComponentName-descendentName-->
  <div class="AppComponent-content">
    <!--ComponentName.is-stateOfComponent-->
    <h1 [class]="clazz" (click)="changeColorToRed()">{{contentsHeading}}</h1>
    <!--utilities-->
    <ul class=" u-listStyleNone">
      <li *ngFor="let p of designPrinciples">
        {{p}}
      </li>
    </ul>
  </div>
</article>

app.component.scss

/*AppComponentに対するstyle*/
.AppComponent {
  width: 100%;
  .AppComponent-header {
    color: #0000ed;
  }
  .AppComponent-content {
    color: #3A3A3A;
  }
  .is-coloredBlack {
    color: black;
  }
  .is-coloredRed {
    color: red;
  }
}

app.component.scssからemitされたapp.component.css

@charset "UTF-8";
/*AppComponentに対するstyle*/
.AppComponent {
  width: 100%;
}

.AppComponent .AppComponent-header {
  color: #0000ed;
}

.AppComponent .AppComponent-content {
  color: #3A3A3A;
}

.AppComponent .is-coloredBlack {
  color: black;
}

.AppComponent.is-coloredRed {
  color: red;
}

utilities.scss

/*listの点を消す*/
.u-listStyleNone {
  list-style: none;
}

utilities.scssからemitされたutilities.css

@charset "UTF-8";
/*listの点を消す*/
.u-listStyleNone {
  list-style: none;
}

実装結果

クリック前
suitccs-before-click.png

クリック後

suitccs-after-click.png

関連記事

CSSの設計方法をまとめてみた~BEM編~ - Qiita

所感

Component指向が根底にある設計方法なので、Angularなどの現代的なComponentベースのフレームワークと相性がよくて、使いやすそう。
Component指向のフレームワークを使っている人にはすごく馴染みやすいと思う。
また、システマチックな感じに考えられているところがよかった。

参考にさせていただいたサイト

suit/doc at master · suitcss/suit

キャメルケース - Wikipedia

HTML5のお勉強 articleとsectionとか - Qiita

[HTML5] 新要素まとめ【2014/2/14版勧告候補】 - Qiita

Qiitaでも同一の投稿を行っている。

CSSの設計方法をまとめてみた~BEM編~

CSSの設計方法をまとめてみた~BEM編~

BEMについて、簡単にまとめる。
詳細な部分に関しては以下の記事がわかりやすかったので、参考にされると良いと思う。

bem-methodology-ja/index.md at master · juno/bem-methodology-ja

BEMという命名規則とSass 3.3の新しい記法 - アインシュタインの電話番号

BEMとは

Block、Element、Modifierの略。
DOMを構成する各要素をBlock、Element、Modifierのどれかに当てはめて命名する。
正直なところCSSのclass名は冗長になるので最初は冗長に感じるかもしれないが、がっつり命名規則が決まるので段々と楽に感じてくるかもしれない。

BEMのメリット

BEMの公式ドキュメントでは、以下の問題を解決するとしている。
なお、以下の英語の部分はBEMの公式ドキュメントから引用しているものである。

BEM PROVIDES THE SAME RULES TO ACHIEVE CODE CONSISTENCY

【意訳】
BEMはコードの一貫性を獲得するための同一ルールを提供する。

Common approach for all technologies: HTML, CSS, JavaScript, docs, tests, etc.

【意訳】
HTML, CSS, JavaScript, docs, testsなどのあらゆる技術に対しての共通のアプローチである。

GROW AND SCALE YOUR CODEBASE

【意訳】
(コードベースが)育ち、あなたのコードベースをスケールさせる。

Most projects use the same components. Code reuse significantly reduces price and time of development.

【意訳】
大半のプロジェクトは、同じコンポーネーネント(部品や構成要素)を使用する。
コードの再利用によって、開発のコストと開発にかかる時間が著しく減少する。

INCREASE PRODUCTIVITY

【意訳】
生産性を上げる。

Simplicity of updates and scalability increases productivity.

【意訳】
アップデートと拡大の容易さは、生産性を上げる。

TEAM WORK

【意訳】
チームワーク

Common terminology provides ability for developers to rapidly switch projects — everything is familiar.

【意訳】
共通した専門用語は、開発者がプロジェクトに(新規に)加わるのにやくに立つ。

DO LESS, GET MORE

【意訳】
することは少ないが、得るものは多い。

Common rules help to automate process. Code may be partially autogenerated.

【意訳】
共通のルールによって、プロセスを自動化が促進される。
コードが部分的に自動生成されるようになるかもしれない。

SUITABLE FOR ANY PROGRAMMING LANGUAGE OR ANY FRAMEWORK

【意訳】
あらゆるプログラミン言語及びフレームワークに適している。

Methodology provides language agnostic practices to increase code reliability and reuse.

【意訳】
方法論は、言語にとらわれないコードの確実性と再利用を促進する習慣を提供する。

EASY TO LEARN

【意訳】
学習しやすい。

You can read all the methodology during your morning coffee.

【意訳】
朝コーヒーを飲んでいる間に全ての方法論を読める(くらいに学習が容易だ)。

PROMOTE REUSE

【意訳】
再利用を促進する。

Code grows following predefined rules.

【意訳】
あらかじめ定義されたルールに基づいて、コードが成長していく。

BEMのデメリット

class名が冗長になる。

Block、Element、Modifierの各説明

Block

アプリケーションやWEBサイトを構成する塊。
ページを構成する大きめの部品だと考えれば良いと思う。
ブロックは、自身の中に別のブロックを含む場合がある。
パソコンで例えると、画面やキーボード(一個一個ではなくて各ボタンの集合として)、トラックパッドなど。

Element

Blockの中に存在し、Blockを構成するための各要素だと考えれば良い。
パソコンで例えると、画面のベセルだったり、キーボードの中の各ボタン(AとかEnterなど)など。

Modifier

BlockやElementの中で、状態やレイアウトが変化するプロパティ(性質)。
名前と値のセットである。

記述方法

BlockElement–Modifier(name)_Modifier(value)と言う形で記述する。(Block(アンダースコア2つ)–Element(ハイフン2つ)Modifierの名前部分_(アンダースコア1つ)Modifierの名前部分)。
Block、Element、Modifier共に、名称が複数の単語から構成される場合は、 hoge-foo のような具合でハイフン1つでつなぐ。

SCSSと一緒に使う

SCSSの & とBEMはすごく相性が良いようだ。
& は、ブロックの親セレクタを参照する。

参考 BEMをSassで快適に書く方法 | maesblog

CSSの常識が変わる!「Compass」の基礎から応用まで! | 株式会社LIG

記述例

実際に、BEMを導入し、SCSSの方式で簡単なサンプルを記述してみる。

HTMLは以下。

<section class="block-A">
    <div class="block-B">
        <div class="block-B__element-B1 block-B__element-B1--modifier-B1">B1</div>
        <div class="block-B__element-B2 block-B__element-B2--modifier-B2">B2</div>
    </div>
    <div class="block-C">
        <div class="block-C__element-C1 block-C__element-C1--modifier-C1">C1</div>
        <div class="block-C__element-C2 block-C__element-C2--modifier-C2">C2</div>
    </div>
</section>

SCSSは以下。

.block-A {
  width: 100%;
  padding: 20%;
  .block-B {
    &__element-B1 {
      width: 50%;
      background-color: black;
      &--modifier-B1 {
        color: white;
      }
    }

    &__element-B2 {
      width: 50%;
      background-color: blue;
      &--modifier-B2 {
        color: yellow;
      }
    }
  }
  .block-C {
    &__element-C1 {
      width: 50%;
      background-color: aquamarine;
      &--modifier-C1 {
        color: orange;
      }
    }
    &__element-C2 {
      width: 50%;
      background-color: deeppink;
      &--modifier-C2 {
        color: green;
      }
    }
  }
}

SCSSを以下のコマンドでコンパイルして生成されたCSSファイルが以下。

コマンド

sass --style expanded bem.scss:bem.css

生成されたcss

.block-A {
  width: 100%;
  padding: 20%;
}
.block-A .block-B__element-B1 {
  width: 50%;
  background-color: black;
}
.block-A .block-B__element-B1--modifier-B1 {
  color: white;
}
.block-A .block-B__element-B2 {
  width: 50%;
  background-color: blue;
}
.block-A .block-B__element-B2--modifier-B2 {
  color: yellow;
}
.block-A .block-C__element-C1 {
  width: 50%;
  background-color: aquamarine;
}
.block-A .block-C__element-C1--modifier-C1 {
  color: orange;
}
.block-A .block-C__element-C2 {
  width: 50%;
  background-color: deeppink;
}
.block-A .block-C__element-C2--modifier-C2 {
  color: green;
}

ブラウザに表示される画面
bem.png

参考にさせていただいたサイト

bem-methodology-ja/index.md at master · juno/bem-methodology-ja

BEMという命名規則とSass 3.3の新しい記法 - アインシュタインの電話番号

BEMを参考にしたCSS設計 - Qiita

bem-methodology-ja/definitions.md at master · juno/bem-methodology-ja

BEM公式

BEMをSassで快適に書く方法 | maesblog

CSSの常識が変わる!「Compass」の基礎から応用まで! | 株式会社LIG

Qiitaでも同一の投稿を行っている。

RxJSの基本をまとめてみた~基本的な概念編(Observable、Observer、Subscriptionなど)~

RxJSの基本をまとめてみた~基本的な概念編~

Angularで使用していたRxJSだが、もっと深くちゃんと理解しようと思い学習したので、これから何回かに分けてまとめる。
仕組みを理解するために、ちょっと冗長な書き方をするところがある。
「これどうなの?」とか「意訳おかしくね?」という部分があったら、ご指摘いただけるとありがたいです。

今回の範囲

今回は、RxJSやリアクティブプログラミングの基本的な概念である、データストリーム、Observable、Observer、Subscription関連についてまとめたいと思う。
今回は、RxJSの公式ドキュメントをかなり参考にさせていただいた。

RxJS

RxJSとは

JavaScript向けのReactive Extensions ライブラリで、リアクティブプログラミングを行うためのもの。
Reactive-Extensions/RxJS: The Reactive Extensions for JavaScript RxJS API Document

リアクティブプログラミング

リアクティブプログラミングとは

リアクティブプログラミングについて様々な説明のされ方があるが、以下の説明が一番ピンときた。

リアクティブプログラミングとは、通知されてくるデータを受け取るたびに関連したプログラムが反応し(リアクション)して、処理を行うようにするプログラミングの考え方です。

引用元 : 須田智之 (2017/2/16)『RxJavaリアクティブプログラミング』 翔泳社

データストリーム

リアクティブプログラミングでは、あらかじめ用意されている固定長のデータだけではなく、随時発生するデータを処理することができる。
この随時データが生成されて、その都度流れてくるデータの流れをデータストリームという。

参考 : 「RxJS」初心者入門 - JavaScriptの非同期処理の常識を変えるライブラリ 須田智之 (2017/2/16)『RxJavaリアクティブプログラミング』 翔泳社

RxJSの基礎

ProducerとConsumer

Producer

データを生成し、そのデータを通知する責務を持つ。
RxJSではObservableがこれに当たる。

An Observable emits items or sends notifications to its observers by calling the observers’ methods.

引用元 : ReactiveX - Observable

【意訳】Observableは、アイテム(データなど)を排出し、observerのメソッドを呼び出すことによって、自身のobserverに通知を送信する。

Consumer

データを受信し、必要な処理を行う責任を持つ。
RxJSではObserverがこれに当たる。

In ReactiveX an observer subscribes to an Observable. Then that observer reacts to whatever item or sequence of items the Observable emits.

引用元 : ReactiveX - Observable

【意訳】ReactiveXでは、ObserverはObservableを購読する。さらに、ObserverはObservableが生成するいかなるアイテム(データなど)やアイテム(データなど)のシーケンスに対して反応する。

ObservableとObserverとオペレータとsubscribe

Observableがデータを生成し、データを通知する。

Observableがデータを生成し、Observerにデータを通知し、届けるまでに、様々な前処理を行うことができる。
この前処理をオペレータという。
オペレータを使って前処理をすると新しいObservableを生成することになる。
オペレータに関しては別で記事を書きたいと思っている。

そして、Observableシーケンス(データストリーム)から流れてくるデータをObserverがsubscribe(購読)する。その際に、受け取ったデータに関して、必要な処理を行う。

上記の処理の流れをエラーが出るか、完了するまで続ける。
エラーや完了については後述する。

ただ、実際のコードの中だと「Observerってどこで生成されているの!?」と思う ことが多々ある。
それは以下のような理由らしい。

Internally in observable.subscribe, it will create an Observer object using the first callback argument as the next handler. All three types of callbacks may be provided as arguments

引用元 : [RxJS Overview](http://reactivex.io/rxjs/manual/overview.html#subscribing-to-observables

【意訳】
observable.subscribe で内部的に、最初のコールバックの引数をnextハンドラーとして使用しているObserverのオブジェクトを生成する。3つのコールバックの全部のタイプが、引数として提供され得る。

Observerの3つのコールバック

Observers are just objects with three callbacks, one for each type of notification that an Observable may deliver.

引用元 : http://reactivex.io/rxjs/manual/overview.html#subscribing-to-observables

【意訳】
Observerは、Observableが届けてくるであろう各々のタイプの通知に対する3つのコールバックを持ったただのオブジェクトである。

Observerは、Observableをsubscribe(購読)し、受け取ったデータを処理する。
ここでいう3つのコールバックがその処理に当たる。
3つのコールバックとは、 nexterrorcomplete である。

next

Observableがデータを生成する度にObservableによって呼び出される。
Observableによって生成されたデータを引数に取る。

error

エラーが発生したことを通知する。
これが呼び出されると、 nextcomplete はそれ以上、呼び出されない。
引数には、エラーの原因が渡される。

complete

完了したことを通知する。
エラーがなかった場合、最後のnextメソッドが呼び出された後に、Observableによって呼び出される。

参考: ReactiveX - Observable

Subscription

A Subscription is an object that represents a disposable resource, usually the execution of an Observable. A Subscription has one important method, unsubscribe, that takes no argument and just disposes the resource held by the subscription.

引用元 : RxJS Overview

【意訳】 Subscriptionとは、破棄可能なリソースを表し、それは大抵Observableの実行を表すオブジェクトである。
Subscriptionは、引数を取らず、subscriptionが持っているリソースの破棄を行うだけのunsubscribeという重要なメソッドを1つ持っている。

=> データストリームであるObservableシケーンスが終わりがなく、随時生成されるデータだった場合に、そのデータストリームの実行状態を終わらせるのにSubscription#unsubscribeを使用するというわけである。

subscribeを呼び出した時にその戻り値として、Subscriptionが返却される。
以下のような感じで使う。

// observable.subscribeの戻り値として、subscriptionを返却
// observable.subscribeの内部的に、Observerのオブジェクトが生成されていている。
let subscription = observable.subscribe(val => console.log(val));
// 進行中の実行をキャンセル
subscription.unsubscribe();

実際にコードを書いてみた

実際に超基礎的なコードを書いてみた。 これまでにまとめてきた概念などを理解しやすくするために、かなり簡単なコードになっているし、普通はもっと合理的な書き方をした方が良い。
それぞれの処理が何をやっているかなどの細かい説明は、コードの中にコメントとして記述してある。

実際のコード

typescript2.5.2を使用している。

import {Observable} from "rxjs";

// Observable.createは、ObservableのコンストラクタのエイリアスでObservableのオブジェクトを生成する
// 引数に、subscribe関数を渡す
// Observableはcreateで作れるけど、基本的にcreateを使って、Observableを生成することはあまりない
// ofとかの別のoperatorから生成する
let observable = Observable.create(function subscribe(observer) {
    try { // 通常時
        // データを通知し、送信する
        // 引数は、実際にObserverに送信されるデータを表している。
        observer.next(0);
        observer.next(1);
        observer.next(2);
        observer.next(3);
        observer.next(4);
        observer.next(5);

        // 正常に完了したことを通知する
        // これが呼ばれた後にnext()しても意味がない
        observer.complete();
    } catch (err) { // 異常時
        // エラーを通知する
        observer.error(err);
    }
});
console.log('開始');
// observable.subscribeの内部的に、Observerのオブジェクトが生成されている
observable.subscribe(val => console.log(val));


console.log('終了');

実行結果

開始
0
1
2
3
4
5
終了

参考

参考文献

須田智之 (2017/2/16)『RxJavaリアクティブプログラミング』 翔泳社

参考にさせていただいたサイト

ReactiveX/rxjs: A reactive programming library for JavaScript
RxJS API Document
(http://reactivex.io/documentation/observable.html))
ReactiveX - Observable
「RxJS」初心者入門 - JavaScriptの非同期処理の常識を変えるライブラリ
http://reactivex.io/rxjs/manual/overview.html#subscribing-to-observables

Qiitaでも同一の記事を投稿している。

Angular4(over2~)のロケールとPipe

Angular4(over2~)のロケール

ローケルとは

ロケールとは、ソフトウェアに内蔵される、言語や国・地域ごとに異なる単位、記号、日付、通貨などの表記規則の集合。または単に、利用する言語や国・地域の指定。多くのソフトウェアやプログラミング言語は、使用する言語とともにロケールを設定し、ロケールで定められた方式に基づいてデータの表記や処理を行う。

引用元 : ロケール(ロカール)とは - IT用語辞典

つまり、各国・地域などによって異なる諸々の表記方法であり、今回はこれを設定する。

ローケルとPipe

AngularのPipeでは、このローケルの設定によって、処理結果が異なるものが多々存在する。

DatePipe

DatePipeもローケルの設定によって、処理結果が変わるPipeの1つである。
例えば、ロケールを設定しないで、以下のコードを実行する。

app.component.ts

import {Component} from '@angular/core';

@Component({
  selector: 'app-root',
  template: `<p class="today">
本日は、{{today | date}} だ
</p>
`,
  styles: [`
.today{background-color:yellow}
`]
})
export class AppComponent {
  today = Date.now();
}

すると、表示される画面は以下のようになる。
スクリーンショット 2017-08-30 13.01.34.png

Angularのアプリケーションにロケールを設定する

LOCALE_IDを使用し、アプリケーションにロケールを設定する。
LOCALE_IDをprovideする際にuseValueでLocaleIdを与えると、アプリケーションのロケールがLocaleIdで渡されたものに設定される。

実際にやってみた

今回は、ロケールを日本に設定してみる。
app.module.tsのproviderでLOCALE_IDをprovideする際にuseValueで与えるLocaleIdを js-JP (日本)にする。
実際のコードは以下。
app.module.ts

import {BrowserModule} from '@angular/platform-browser';
import {NgModule, LOCALE_ID} from '@angular/core';
import {AppComponent} from './app.component';

@NgModule({
  declarations: [
    AppComponent
  ],
  imports: [
    BrowserModule],
  providers: [{provide: LOCALE_ID, useValue: 'ja-JP'}],
  bootstrap: [AppComponent]
})
export class AppModule {
}

componentは先ほどと変わらず以下のようなものにする。
app.component.ts

import {Component} from '@angular/core';

@Component({
  selector: 'app-root',
  template: `<p class="today">
本日は、{{today | date}} だ
</p>
`,
  styles: [`
.today{background-color:yellow}
`]
})
export class AppComponent {
  today = Date.now();
}

実行結果

実行すると以下のようになり、ロケールが日本になったため、同じコードでもDataPipeで変換された後の値が変化していることがわかる。

スクリーンショット 2017-08-30 13.16.31.png

参考文献

山田 祥寛(2017/8/4)『Angularアプリケーションプログラミング』 技術評論社

参考にさせていただいたサイト

ロケール(ロカール)とは - IT用語辞典 Angular - Pipes

DatePipe

LOCALE_ID

Qiitaでも同一の投稿を行っている
Angular4(over2~)のロケールとPipe - Qiita