View on GitHub

ApproximateStream

1)

to deploy a kafka - spark structured streaming cluser on Azure, do the following: Use Apache Spark Structured Streaming with Apache Kafka on HDInsight SparkStruturedStreaming-kafka-Azure

2) install jq on your local machine

sudo apt update
sudo apt install jq

3) Deploy the Spark-kafka cluster using a template that is available in Microsoft website:

Cluster size - Spark Node type|Node size|Number of nodes ———|———|————— Head|D12 v2 (4 Cores, 28 GB RAM)|2 Worker|D13 v2 (8 Cores, 56 GB RAM)|4 Zookeeper|A2 v2 (2 Cores, 4 GB RAM)|3

Cluster size - Kafka Node type|Node size|Number of nodes ———|———|————— Head|D12 v2 (4 Cores, 28 GB RAM)|2 Worker|D13 v2 (8 Cores, 56 GB RAM)|4 Zookeeper|A2 v2 (2 Cores, 4 GB RAM)|3

4) start using the cluster

  1. Gather host information
export password='YOUR_KAFKA_CLUSTER_PASSWORD'
export CLUSTERNAME=YOUR_KAFKA_CLUSTER_NAME


curl -sS -u admin:$password -G "https://YOUR_KAFKA_CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/skafka/services/ZOOKEEPER/components/ZOOKEEPER_SERVER" | jq -r '["\(.host_components[].HostRoles.host_name):2181"] | join(",")' | cut -d',' -f1,2
curl -sS -u admin:$password -G https://YOUR_KAFKA_CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/skafka/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2

5) Copying required jars!

this project depends on some spatial processing libraries above Apache Spark, you need to load them to the project in order to be able to call them in Jupyter

copying files

in order to be able to access the Neigboors .geojson file, 
we need to store it in a blob storage:
- go to "HDInsight clusters" --> Spark cluster name --> search for "storage accounts",
- select the "Azure Storage" name 
- storage explorer --> blob containers --> sspark --> create new folder "datasets"
- upload shenzhen_converted.geojson

find it here

- then you can access it in your notebook using:
- "wasb://CONTAINER_NAME@STORAGE_ACCOUNT_NAME.blob.core.windows.net/datasets/shenzhen_converted.geojson"
- where sspark is the spark cluster name

replace CONTAINER_NAME with the container name in your Spark storage account where you hosted the shenzhen_converted.geojson regions file. ALSO, replace STORAGE_ACCOUNT_NAME with the name of your Spark storage account

scp guang.csv USER_NAME@YOUR_KAFKA_CLUSTER_NAME-ssh.azurehdinsight.net:guang.csv 

7) to run kafka producer

  1. create the topic in Jupyter
  2. login to the headnode of kafka cluster
    • navigate to kafka cluster ‘skafka’ SSH + Cluster login
    • copy the login command and use it in your local machine’s terminal

      ssh USER_NAME@YOUR_KAFKA_CLUSTER_NAME-ssh.azurehdinsight.net

  3. get the kafkaBrokers list running the following command in your local machine
sudo apt -y install jq
export password='cluster_pass'

export KAFKABROKERS=$(curl -sS -u admin:$password -G https://YOUR_KAFKA_CLUSTER_NAME.azurehdinsight.net/api/v1/clusters/skafka/services/KAFKA/components/KAFKA_BROKER | jq -r '["\(.host_components[].HostRoles.host_name):9092"] | join(",")' | cut -d',' -f1,2);
  1. run the following command to start the kafka producer in kafka cluster head node
**you need to create the topic first, maybe in the jupyter notebook with the %%bash magic command**
java -jar kafka-producer-consumer.jar shenzhen spatial1 $KAFKABROKERS /home/isam/guang.csv 1

References