40

As per title. I'm aware of textFile but, as the name suggests, it works only on text files. I would need to access files/directories inside a path on either HDFS or a local path. I'm using pyspark.

6 Answers 6

72

Using JVM gateway maybe is not so elegant, but in some cases the code below could be helpful:

URI           = sc._gateway.jvm.java.net.URI
Path          = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem    = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration


fs = FileSystem.get(URI("hdfs://somehost:8020"), Configuration())

status = fs.listStatus(Path('/some_dir/yet_another_one_dir/'))

for fileStatus in status:
    print(fileStatus.getPath())
6
  • 7
    use globStatus instead of fileStatus if you want to filter the results, e.g. status = fs.globStatus(Path('/some_dir/yet_another_one_dir/*.csv')) Commented Jan 19, 2017 at 22:14
  • 3
    This is quite good since it doesn't require me to upload additional libraries to spark-submit. Commented Aug 29, 2017 at 4:08
  • 1
    What is a good way to get/lookup somehost, i.e., the namenode, in pyspark? Commented Jan 16, 2019 at 15:49
  • I have thousands of files, and this piece of code files = [file.getPath() for file in status] takes a while. Is it normal? I'd say this is not the best efficent way. Commented Mar 28, 2019 at 11:11
  • 1
    @jcomeau_ictx I think you meant to say to use globStatus instead of listStatus, not fileStatus (which is just a temp variable).
    – Martim
    Commented Oct 16, 2021 at 14:52
18

I believe it's helpful to think of Spark only as a data processing tool, with a domain that begins at loading the data. It can read many formats, and it supports Hadoop glob expressions, which are terribly useful for reading from multiple paths in HDFS, but it doesn't have a builtin facility that I'm aware of for traversing directories or files, nor does it have utilities specific to interacting with Hadoop or HDFS.

There are a few available tools to do what you want, including esutil and hdfs. The hdfs lib supports both CLI and API, you can jump straight to 'how do I list HDFS files in Python' right here. It looks like this:

from hdfs import Config
client = Config().get_client('dev')
files = client.list('the_dir_path')
4
  • 5
    Hi, Can you please guide me how to make that hdfscli.cfg file, i dont know what port number to put it. [global] default.alias = dev [dev.alias] url = dev.namenode:port user = ann Commented Jul 10, 2017 at 11:50
  • 2nd @ShivamKotwalia as I cannot specify one user access to my EMR, it has to be fully dynamic and I am not passing configured users through my code
    – Talador12
    Commented Mar 14, 2019 at 22:04
  • what I am support to put in get_client('dev'), I don't have the alias 'dev'
    – Haha TTpro
    Commented May 13, 2020 at 4:48
  • @HahaTTpro you can call that function without a parameter, it is only required if you have set up alternate aliases. If you aren't sure, look for a file ~/.hdfscli.cfg, and within that search for default.alias Commented May 13, 2020 at 15:34
18

If you use PySpark, you can execute commands interactively:


List all files from a chosen directory:

hdfs dfs -ls <path> e.g.: hdfs dfs -ls /user/path:

import os
import subprocess

cmd = 'hdfs dfs -ls /user/path'
files = subprocess.check_output(cmd, shell=True).strip().split('\n')
for path in files:
  print path

Or search files in a chosen directory:

hdfs dfs -find <path> -name <expression> e.g.: hdfs dfs -find /user/path -name *.txt:

import os
import subprocess

cmd = 'hdfs dfs -find {} -name *.txt'.format(source_dir)
files = subprocess.check_output(cmd, shell=True).strip().split('\n')
for path in files:
  filename = path.split(os.path.sep)[-1].split('.txt')[0]
  print path, filename
3
  • Hi Shouldn't the files = subprocess.check_output(cmd_find).strip().split('\n') be files = subprocess.check_output(cmd).strip().split('\n') I tried editing but SO said that the edit must be greater than 6 changes. Commented Jul 10, 2017 at 10:38
  • @Darius Morawiec: how to execute hdfs dfs -rm -r command? is it using the same check_output method or some other way?
    – Shankar
    Commented Aug 8, 2018 at 6:37
  • 1
    @Shankar, for that you can use subprocess.call or subprocess.check_call.
    – Darius
    Commented Nov 27, 2018 at 13:50
6

This might work for you:

import subprocess, re
def listdir(path):
    files = str(subprocess.check_output('hdfs dfs -ls ' + path, shell=True))
    return [re.search(' (/.+)', i).group(1) for i in str(files).split("\\n") if re.search(' (/.+)', i)]

listdir('/user/')

This also worked:

hadoop = sc._jvm.org.apache.hadoop
fs = hadoop.fs.FileSystem
conf = hadoop.conf.Configuration()
path = hadoop.fs.Path('/user/')
[str(f.getPath()) for f in fs.get(conf).listStatus(path)]
1
  • Your second option is the only one that has worked for me. Commented Apr 13, 2023 at 13:50
3

If you want to read in all files in a directory, check out sc.wholeTextFiles [doc], but note that the file's contents are read into the value of a single row, which is probably not the desired result.

If you want to read only some files, then generating a list of paths (using a normal hdfs ls command plus whatever filtering you need) and passing it into sqlContext.read.text [doc] and then converting from a DataFrame to an RDD seems like the best approach.

0

There is an easy way to do this using snakebite library

from snakebite.client import Client

hadoop_client = Client(HADOOP_HOST, HADOOP_PORT, use_trash=False)

for x in hadoop_client.ls(['/']):

...     print x

Not the answer you're looking for? Browse other questions tagged or ask your own question.