As per title. I'm aware of textFile
but, as the name suggests, it works only on text files.
I would need to access files/directories inside a path on either HDFS or a local path. I'm using pyspark.
6 Answers
Using JVM gateway maybe is not so elegant, but in some cases the code below could be helpful:
URI = sc._gateway.jvm.java.net.URI
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration
fs = FileSystem.get(URI("hdfs://somehost:8020"), Configuration())
status = fs.listStatus(Path('/some_dir/yet_another_one_dir/'))
for fileStatus in status:
print(fileStatus.getPath())
-
7use
globStatus
instead offileStatus
if you want to filter the results, e.g.status = fs.globStatus(Path('/some_dir/yet_another_one_dir/*.csv'))
Commented Jan 19, 2017 at 22:14 -
3This is quite good since it doesn't require me to upload additional libraries to spark-submit. Commented Aug 29, 2017 at 4:08
-
1What is a good way to get/lookup
somehost
, i.e., the namenode, in pyspark? Commented Jan 16, 2019 at 15:49 -
I have thousands of files, and this piece of code
files = [file.getPath() for file in status]
takes a while. Is it normal? I'd say this is not the best efficent way. Commented Mar 28, 2019 at 11:11 -
1@jcomeau_ictx I think you meant to say to use
globStatus
instead oflistStatus
, notfileStatus
(which is just a temp variable).– MartimCommented Oct 16, 2021 at 14:52
I believe it's helpful to think of Spark only as a data processing tool, with a domain that begins at loading the data. It can read many formats, and it supports Hadoop glob expressions, which are terribly useful for reading from multiple paths in HDFS, but it doesn't have a builtin facility that I'm aware of for traversing directories or files, nor does it have utilities specific to interacting with Hadoop or HDFS.
There are a few available tools to do what you want, including esutil and hdfs. The hdfs lib supports both CLI and API, you can jump straight to 'how do I list HDFS files in Python' right here. It looks like this:
from hdfs import Config
client = Config().get_client('dev')
files = client.list('the_dir_path')
-
5Hi, Can you please guide me how to make that hdfscli.cfg file, i dont know what port number to put it. [global] default.alias = dev [dev.alias] url = dev.namenode:port user = ann Commented Jul 10, 2017 at 11:50
-
2nd @ShivamKotwalia as I cannot specify one user access to my EMR, it has to be fully dynamic and I am not passing configured users through my code Commented Mar 14, 2019 at 22:04
-
what I am support to put in get_client('dev'), I don't have the alias 'dev' Commented May 13, 2020 at 4:48
-
@HahaTTpro you can call that function without a parameter, it is only required if you have set up alternate aliases. If you aren't sure, look for a file ~/.hdfscli.cfg, and within that search for
default.alias
Commented May 13, 2020 at 15:34
If you use PySpark, you can execute commands interactively:
List all files from a chosen directory:
hdfs dfs -ls <path>
e.g.: hdfs dfs -ls /user/path
:
import os
import subprocess
cmd = 'hdfs dfs -ls /user/path'
files = subprocess.check_output(cmd, shell=True).strip().split('\n')
for path in files:
print path
Or search files in a chosen directory:
hdfs dfs -find <path> -name <expression>
e.g.: hdfs dfs -find /user/path -name *.txt
:
import os
import subprocess
cmd = 'hdfs dfs -find {} -name *.txt'.format(source_dir)
files = subprocess.check_output(cmd, shell=True).strip().split('\n')
for path in files:
filename = path.split(os.path.sep)[-1].split('.txt')[0]
print path, filename
-
Hi Shouldn't the files = subprocess.check_output(cmd_find).strip().split('\n') be files = subprocess.check_output(cmd).strip().split('\n') I tried editing but SO said that the edit must be greater than 6 changes. Commented Jul 10, 2017 at 10:38
-
@Darius Morawiec: how to execute
hdfs dfs -rm -r
command? is it using the same check_output method or some other way?– ShankarCommented Aug 8, 2018 at 6:37 -
1@Shankar, for that you can use subprocess.call or subprocess.check_call.– DariusCommented Nov 27, 2018 at 13:50
This might work for you:
import subprocess, re
def listdir(path):
files = str(subprocess.check_output('hdfs dfs -ls ' + path, shell=True))
return [re.search(' (/.+)', i).group(1) for i in str(files).split("\\n") if re.search(' (/.+)', i)]
listdir('/user/')
This also worked:
hadoop = sc._jvm.org.apache.hadoop
fs = hadoop.fs.FileSystem
conf = hadoop.conf.Configuration()
path = hadoop.fs.Path('/user/')
[str(f.getPath()) for f in fs.get(conf).listStatus(path)]
-
Your second option is the only one that has worked for me. Commented Apr 13, 2023 at 13:50
If you want to read in all files in a directory, check out sc.wholeTextFiles
[doc], but note that the file's contents are read into the value of a single row, which is probably not the desired result.
If you want to read only some files, then generating a list of paths (using a normal hdfs ls command plus whatever filtering you need) and passing it into sqlContext.read.text
[doc] and then converting from a DataFrame
to an RDD
seems like the best approach.
There is an easy way to do this using snakebite library
from snakebite.client import Client
hadoop_client = Client(HADOOP_HOST, HADOOP_PORT, use_trash=False)
for x in hadoop_client.ls(['/']):
... print x