HDFS to elasticsearch, with pyspark / argo-workflow / kubernetes

Thomas Decaux
2 min readNov 11, 2022

At X.com, we love elasticsearch, we use it to power our search engines, and a lot for analytics stuff.

On other side, our genius data analyst, has bring Apache Spark in the company, we used to use a home made ETL, called Djobi to export our HDFS data to elasticsearch, but as we are migrating to HDFS → S3, YARN → kubernetes, Java is no more necessary.

In a previous story, https://thomasdecaux.medium.com/run-pyspark-script-on-kubernetes-with-argo-workflow-5ac6bba5e389 , I have already put the context: let’s use python to run spark job, scheduled with argo-workflow and run on kubernetes, such keywords here!

argo-workflow is a fantastic tool, it bring so many features to define job, but it can get messy very quickly, workflow of workflow of workflow templates.

So, here, let’s do first a workflowTemplate, to run pyspark job that execute SQL query, and write results into elasticsearch. Of course, we are going to use spark packages to install elasticsearch spark lib.

As security is always an important part, this example will run in the context:

  • strong security context: root filesystem as read-only
  • no direct internet access, must use an entreprise proxy

Workflow template => the library

Here we are going to define a WorkflowTemplate object, it’s like a code library function:

  • inputs parameters
  • code
  • return results

Here we have many things:

  • some inputs parameters to customize python script, logging, spark configuration…
  • spark conf & log4j properties are argo-workflow artifacts, this is a super feature to create file on the fly, without create K8S configMap or PVC.
  • as we download spark packages on the fly, we setup HTTP proxy and a volume to store & cache maven/ivy stuff
  • we use datatok Docker image from https://github.com/datatok/docker-spark , nothing magic inside
  • we run spark as client mode, locally, no really fancy, next story will run spark on kubernetes!
  • at the end of the python script, we count how many documents are in the index, and store it as an outputs parameter

Workflow => the app that use the library

So now we have a cool function, let’s use it!

we define some workflow arguments:

  • the date , to run workflow every day

then we reference our previous workflowTemplate, and set some parameters.

Et voilà! we can re-use our workflow template to create a production ready ETL with pyspark / argo-workflow / kubernetes