Fluentd, Cassandra, Spark and Tableau Pipeline

A belief is not merely an idea the mind possessed; it is an idea that possesses the mind.

Robert Oxton Bolt

Introduction

The purpose of this data pipeline is to clean, aggregate, map and reduce and finally visualize the unstructured data. Besides that, you can also utilize spark SQL or Cassandra APIs to build customized analytics tools, such as machine learning, or any other learnings integrated tools, based on your business need.

All the components in this pipeline have other alternatives, such as Fluentd vs Logstash, Cassandra vs MongoDB, and Tabelau vs Superset.

Compared to Logstash, Fluentd uses fewer computing resources. By using the built-in plugin, FluentD UI, you can monitor the performance of each td-agents. On the other hand, Logstash is more native to the Filebeat-Logstash-Elasticsearch-Kibana pipeline. And Logstash is powerful when it leverages the X-pack library.

In this pipeline, I consider Cassandra, since it has a better performance on frequent writing. Besides that,Cassandra is more compatible with Spark. First of all, the RDD (Resilient Distributed Dataset) concept is similar as Cassandra data model. Both of them are partitioned across all the cluster.

I will discuss how to use this pipeline to apply statistics analysis on finance data in another blog.

Version

Spark (2.4.4)
Cassandra (3.11)
Fluentd (td-agent v3.5.1)

Installation and Configuration

In this blog, I used Ubuntu for installation and configuration. For production built, you can use CI/CD tools for Orchestration.

  • FluentD

    Download FluentD for Ubuntu Xenial:

    1
    curl -L https://toolbelt.treasuredata.com/sh/install-ubuntu-xenial-td-agent3.sh | sh

    Start and check the status of FluentD service:

    1
    2
    sudo systemctl start td-agent.service
    sudo systemctl status td-agent.service

    Check the configuration:

    1
    cat /etc/td-agent/td-agent.conf

    In the configuration, you need to use a third party plugin in order to output to Cassandra. This link FluentD all Plugins provides all the available plugins for fluentd. The plugin we are using here is the fluent-plugin-cassandra-json, because you could set authentications. But the cons is that you have to map the exact name and type from fluentd to those in your Cassandra schema:

    1. Installation

      1
      2
      apt-get install gcc make
      td-agent-gem install install fluent-plugin-cassandra-json
    2. Configuration

      1
      2
      3
      4
      5
      6
      7
      8
      9
      <match cassandra.**>
      type cassandra_json
      hosts 127.0.0.1
      port 9042
      username CHANGE_ME
      password CHANGE_ME
      keyspace metrics
      column_family logs
      </match>

      (Note: Insert NULL value is not recommended, since it will create tombstones. Read this article about the negative side of creating tombstones in Cassandra: Cassandra Tombstone.)

  • Cassandra

    I am building a cluster which contains 3 nodes.
    (Note: for the spark cassandra connector, you may get error when you try to set multiple hosts, which are not at the same DC, for the SparkConf. The error is: Contact points contain multiple data centers.)

    1. Prerequisites

      Configure SSH

      1
      2
      3
      4
      5
      cd ~
      apt-get install openssh-server openssh-client
      ssh-keygen -t rsa -P ""
      # for all the 3 nodes
      cp .ssh/id_ras.pub .ssh/authorized_keys
    2. Cassandra Cluster Installation

      Installation from Debian packages

      1
      2
      3
      4
      5
      6
      7
      echo "deb http://www.apache.org/dist/cassandra/debian 36x main" | sudo tee -a /etc/apt/sources.list.d/cassandra.sources.list
      curl https://www.apache.org/dist/cassandra/KEYS | sudo apt-key add -
      sudo apt-get update
      # If get below error:
      # GPG error: http://www.apache.org 36x InRelease: The following signatures couldn't be verified because the public key is not available: NO_PUBKEY A278B781FE4B2BDA
      # Run sudo apt-key adv --keyserver pool.sks-keyservers.net --recv-key A278B781FE4B2BDA
      sudo apt-get install cassandra

      Check the status and directory

      1
      2
      3
      4
      5
      6
      7
      8
      sudo service cassandra start
      sudo service cassandra stop
      nodetool status
      # configuration
      /etc/cassandra
      # log directory
      /var/log/cassandra/
      /var/lib/cassandr
    3. Cassandra Cluster Configuration

      Edit the YAML file

      1
      2
      3
      4
      5
      6
      vi /etc/cassandra/cassandra.yaml
      # authenticator: org.apache.cassandra.auth.PasswordAuthenticator
      # authorizer: org.apache.cassandra.auth.CassandraAuthorizer
      # role_manager: CassandraRoleManager
      # roles_validity_in_ms: 0
      # permissions_validity_in_ms: 0

      Create Super User

      1
      2
      3
      4
      5
      cqlsh -u cassandra -p cassandra
      cassandra@cqlsh> CREATE ROLE [new_superuser] WITH PASSWORD = '[secure_password]' AND SUPERUSER = true AND LOGIN = true;
      superuser@cqlsh> ALTER ROLE cassandra WITH PASSWORD = 'cassandra' AND SUPERUSER = false AND LOGIN = false;
      superuser@cqlsh> REVOKE ALL PERMISSIONS ON ALL KEYSPACES FROM cassandra;
      superuser@cqlsh> GRANT ALL PERMISSIONS ON ALL KEYSPACES TO [superuser];
  • Spark

    The spark will contain 3 instances.

    1. Prerequisites

      Install Java and Scala

      1
      2
      3
      4
      sudo add-apt-repository ppa:webupd8team/java
      sudo apt-get update
      sudo apt-get install oracle-java8-installer
      sudo apt-get install scala
    2. Spark Cluster Installation
      • Download Spark by using the suggested mirror and setup environment for Spark

        1
        2
        3
        4
        5
        6
        7
        8
        9
        # Download and Install
        wget http://ftp.wayne.edu/apache/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
        tar xvf spark-2.4.4-bin-hadoop2.7.tgz
        mv spark-2.3.0-bin-hadoop2.7 /opt/spark
        # Setup environment
        vi ~/.bashrc
        # Add below line in the file
        export PATH = $PATH:/opt/spark/bin
        source ~/.bashrc
      • Setup Cluster

        • Set the environment config (Master only)

          1
          2
          3
          cd /opt/spark/conf
          cp spark-evn.sh.template spark-env.sh
          vi spark-env.sh
        • Edit the environment config (Master only)

          1
          2
          export SPARK_MASTER_HOST='<MASTER-IP>'
          export JAVA_HOME=<Path_of_JAVA_installation>
        • Set workers config (Workers only)

          1
          2
          3
          4
          5
          6
          cd /opt/spark
          vi slaves
          # Add the below lines into the file
          master
          slave01
          slave02
      • Start Spark Cluster

        1
        2
        cd /opt/spark
        ./sbin/start-all.sh
    • Spark and Cassandra Connector

      • Install from git repo

        1
        2
        sudo apt-get install git
        git clone https://github.com/datastax/spark-cassandra-connector.git
      • Check version compatibility
        connector(2.4.2) spark(2.4) Cassandra(3.0) Scala (2.11)

      • Assemble

        1
        2
        3
        cd spark-cassandra-connector
        git checkout v2.4.2
        ./sbt/sbt assembly
      • Run Spark Shell

        1
        spark-shell --jars [spark-cassandra-connector full path]
    • Tableau

      Follow this link to download the tableau-cassandra driver and then config in Tableau Desktop version ODBC Driver. Beside connecting to Cassandra, you can also connect to Spark SQL and perform query against the Spark infrastructure Spark SQL Tableau Setup

    • To Do List

      1. Install ZooKeeper for high availability Spark
      2. Compare the Hadoop-Spark with Cassandra-Spark
    • Reference

      1. Cassandra Official Documentation
      2. Cassandra Installation Steps
      3. Spark-Cassandra Connector Documentation
      4. Spark SQL Build in Function