pyspark and HDFS commands

I would like to do some cleanup at the start of my Spark program (Pyspark). For example, I would like to delete data from previous HDFS run. In pig this can be done using commands such as

fs -copyFromLocal ....

rmf /path/to-/hdfs

or locally using sh command.

I was wondering how to do the same with Pyspark.

You can execute arbitrary shell command using form example subprocess.call or sh library so something like this should work just fine:

import subprocess

some_path = ...
subprocess.call(["hadoop", "fs", "-rm", "-f", some_path])

If you use Python 2.x you can try using spotify/snakebite:

from snakebite.client import Client

host = ...
port = ...
client = Client(host, port)
client.delete(some_path, recurse=True)

hdfs3 is yet another library which can be used to do the same thing:

from hdfs3 import HDFileSystem

hdfs = HDFileSystem(host=host, port=port)
HDFileSystem.rm(some_path)

Apache Arrow Python bindings are the latest option (and that often is already available on Spark cluster, as it is required for pandas_udf):

from pyarrow import hdfs

fs = hdfs.connect(host, port)
fs.delete(some_path, recursive=True)

Interacting With HDFS from PySpark, How to execute HDFS commands from Spark with Python, to list, delete, or perform other HDFS operations. Use the hadoop fs -put command to copy the files into HDFS. This command distributes your data files across the cluster's datanodes. The hadoop fs command should be in your command path by default. Documentation for the hadoop fs command lists other options. These options can be used to list your files in HDFS, delete HDFS files, copy files out

You can delete an hdfs path in PySpark without using third party dependencies as follows:

from pyspark.sql import SparkSession
# example of preparing a spark session
spark = SparkSession.builder.appName('abc').getOrCreate()
sc = spark.sparkContext
# Prepare a FileSystem manager
fs = (sc._jvm.org
      .apache.hadoop
      .fs.FileSystem
      .get(sc._jsc.hadoopConfiguration())
      )
path = "Your/hdfs/path"
# use the FileSystem manager to remove the path
fs.delete(sc._jvm.org.apache.hadoop.fs.Path(path), True)

To improve one step further, you can wrap the above idea into a helper function that you can re-use across jobs/packages:

from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('abc').getOrCreate()

def delete_path(spark, path):
    sc = spark.sparkContext
    fs = (sc._jvm.org
          .apache.hadoop
          .fs.FileSystem
          .get(sc._jsc.hadoopConfiguration())
          )
    fs.delete(sc._jvm.org.apache.hadoop.fs.Path(path), True)

delete_path(spark, "Your/hdfs/path")

HDFS and Spark FAQ, Together, Spark and HDFS offer powerful capabilities for writing simple code that Other commands are also available; run hdfs dfs with no parameters to see a� There are a few available tools to do what you want, including esutil and hdfs. The hdfs lib supports both CLI and API, you can jump straight to 'how do I list HDFS files in Python' right here. It looks like this: from hdfs import Config client = Config().get_client('dev') files = client.list('the_dir_path')

from https://diogoalexandrefranco.github.io/interacting-with-hdfs-from-pyspark/ using only PySpark

######
# Get fs handler from java gateway
######
URI = sc._gateway.jvm.java.net.URI
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
fs = FileSystem.get(URI("hdfs://somehost:8020"), sc._jsc.hadoopConfiguration())

# We can now use the Hadoop FileSystem API (https://hadoop.apache.org/docs/current/api/org/apache/hadoop/fs/FileSystem.html)
fs.listStatus(Path('/user/hive/warehouse'))
# or
fs.delete(Path('some_path'))

the other solutions didn't work in my case, but this blog post helped :)

Hadoop Spark: How to iterate hdfs directories?, Using PySpark hadoop = sc. You can use the hadoop fs -ls command to READ MORE answered Aug 7, 2019 in Apache Spark by ashish With this, we come to an end to Pyspark RDD Cheat Sheet. Check out the Python Spark Certification Training using PySpark by Edureka , a trusted online learning company with a network of more than 250,000 satisfied learners spread across the globe.

pyspark package — PySpark 2.0.2 documentation, Read a directory of binary files from HDFS, a local file system (available on all checkCode – whether or not to check the return value of the shell command. But is there a more native pyspark way to achieve this? UPDATE This is not a case of broadcasting data because each worker will read different data from hdfs. One of the use cases is reading a few large binary files in each worker (this is clearly not a case for broadcast). Another case is to read "command" file containing instructions.

Accessing HDFS Files from Spark, When accessing an HDFS file from PySpark, you must set HADOOP_CONF_DIR in an environment variable, as in the following example: $ export� It is easy to run Hadoop command in Shell or a shell script. However, there is often a need to run manipulate hdfs file directly from python. We use examples to describe how to run hadoop command in python to list, save hdfs files. We already know how to call an extern shell command from python. We can simply call Hadoop command using the run

Hadoop and Spark, Format them for ingestion into HDFS; Use the hadoop fs -put command to copy the files into HDFS. This command distributes your data files across the cluster's� Help hdfs shell command helps hadoop developers figure out all the available hadoop commands and how to use them. If you would like more information about Big Data and Hadoop Certification, please click the orange "Request Info" button on top of this page.

Comments
  • You cannot do such a thing with Spark. Maybe the best option is to use a oozie workflow in which you can put both HDFS commands and Spark jobs and you can combine them according to the logic you prefer.
  • Is somehost a host node running a particular service or any other specification? Getting java.net.ConnectException: Connection refused; error when trying using a httpfs-hosting node.