0

I am working on a requirement to get stats about files stored in Linux using Scala.

We will pass the root directory as input and our code will get the complete list of sub directories for the root directory passed.

Then for each directory in the list i will get the files list and for each files I will get the owners, groups, permission, lastmodifiedtime, createdtime, lastaccesstime.

The problem is how to can I process the directories list in parallel to get the stats of the files stored in that directory.

In production environment we have 100000+ of folders inside root folders.

So my list is having 100000+ folders list.

How can I parallize my operation(file stats) on my available list.

Since I am new to Scala please help me in this requirement.

Sorry for posting without code snippet.

Thanks.

6
  • This is most likely bottlenecked by the file-system returning the list of files in a directory and that cannot be multi-threaded.
    – Thilo
    Commented Nov 10, 2019 at 12:18
  • Instead of using the parallel-collections library, this sounds like a job for Future, where every subdirectory is processed in its own Future. This sounds like an interesting program, so I will work on it tomorrow.
    – Allen Han
    Commented Nov 10, 2019 at 14:13
  • What kind of output are you expecting? I have two questions: 1) Are multiple output files allowed? 2) Does the output have to reflect the structure of the file hierarchy? It would be easier to implement the solution if the answers to the two questions were 1) yes and 2) no. For number two, to explain what I mean, I am asking if the subdirectories can be interleaved in no fixed order. I would prefer to print the data so that only the files of each subdirectory are grouped together, but not to impose order on the "chunk" each subdirectory represents. I can print the path of the subdirectory ...
    – Allen Han
    Commented Nov 10, 2019 at 15:23
  • ... above the file data for that subdirectory, so that the user knows what data they are looking at. But I would prefer that the data be allowed to be scrambled. That would make the algorithm faster. However, the answers to the questions I am asking can be different from what I would prefer. That would just make the implementation slower or less pretty.
    – Allen Han
    Commented Nov 10, 2019 at 15:25
  • Also, I would like to add that if the answer to the questions are 1) no, and 2) yes, then implementing the solution will be constrained by the amount of data I can hold in memory. That means I would have to store all the data in a database during the traversal and retrieve it all at the end. That would give you the cleanest data - everything in a single log file, ordered in a hierarchy - but it would be slower because although the traversal uses multithreading, the retrieval of the data from the database to produce the log file at the end won't be.
    – Allen Han
    Commented Nov 10, 2019 at 15:33

1 Answer 1

2

I ended up using Akka actors.

I made assumptions about your desired output so that the program would be simple and fast. The assumptions I made are that the output is JSON, the hierarchy is not preserved, and that multiple files are acceptable. If you don't like JSON, you can replace it with something else, but the other two assumptions are important for keeping the current speed and simplicity of the program.

There are some command line parameters you can set. If you don't set them, then defaults will be used. The defaults are contained in Main.scala.

The command line parameters are as follows:

(0) the root directory you are starting from; (no default)

(1) the timeout interval (in seconds) for all the timeouts in this program; (default is 60)

(2) the number of printer actors to use; this will be the number of log files created; (default is 50)

(3) the tick interval to use for the monitor actor; (default is 500)

For the timeout, keep in mind this is the value of the time interval to wait at the completion of the program. So if you run a small job and wonder why it is taking a minute to complete, it is because it is waiting for the timeout interval to elapse before closing the program.

Because you are running such a large job, it is possible that the default timeout of 60 is too small. If you are getting exceptions complaining about timeout, increase the timeout value.

Please note that if your tick interval is set too high, there is a chance your program will close prematurely.

To run, just start sbt in project folder, and type

runMain Main <canonical path of root directory>

I couldn't figure how to get the group of a File in Java. You'll need to research that and add the relevant code to Entity.scala and TraverseActor.scala.

Also f.list() in TraverseActor.scala was sometimes coming back as null, which was why I wrapped it in an Option. You'll have to debug that issue to make sure you aren't failing silently on certain files.

Now, here are the contents of all the files.

build.sbt

name := "stackoverflow20191110"

version := "0.1"

scalaVersion := "2.12.1"

libraryDependencies ++= Seq(
  "io.circe" %% "circe-core",
  "io.circe" %% "circe-generic",
  "io.circe" %% "circe-parser"
).map(_ % "0.12.2")

libraryDependencies += "com.typesafe.akka" %% "akka-actor" % "2.4.16"

Entity.scala

import io.circe.Encoder
import io.circe.generic.semiauto._

sealed trait Entity {
  def path: String
  def owner: String
  def permissions: String
  def lastModifiedTime: String
  def creationTime: String
  def lastAccessTime: String
  def hashCode: Int
}

object Entity {
  implicit val entityEncoder: Encoder[Entity] = deriveEncoder
}

case class FileEntity(path: String, owner: String, permissions: String, lastModifiedTime: String, creationTime: String, lastAccessTime: String) extends Entity

object fileentityEncoder {
  implicit val fileentityEncoder: Encoder[FileEntity] = deriveEncoder
}

case class DirectoryEntity(path: String, owner: String, permissions: String, lastModifiedTime: String, creationTime: String, lastAccessTime: String) extends Entity

object DirectoryEntity {
  implicit val directoryentityEncoder: Encoder[DirectoryEntity] = deriveEncoder
}

case class Contents(path: String, files: IndexedSeq[Entity])

object Contents {
  implicit val contentsEncoder: Encoder[Contents] = deriveEncoder
}

Main.scala

import akka.actor.ActorSystem
import akka.pattern.ask
import akka.util.Timeout
import java.io.{BufferedWriter, File, FileWriter}

import ShutDownActor.ShutDownYet

import scala.concurrent.Await
import scala.concurrent.duration._
import scala.util.Try

object Main {

  val defaultNumPrinters = 50

  val defaultMonitorTickInterval = 500

  val defaultTimeoutInS = 60

  def main(args: Array[String]): Unit = {
    val timeoutInS = Try(args(1).toInt).toOption.getOrElse(defaultTimeoutInS)

    val system = ActorSystem("SearchHierarchy")

    val shutdown = system.actorOf(ShutDownActor.props)

    val monitor = system.actorOf(MonitorActor.props(shutdown, timeoutInS))

    val refs = (0 until Try(args(2).toInt).toOption.getOrElse(defaultNumPrinters)).map{x =>
      val name = "logfile" + x
      (name, system.actorOf(PrintActor.props(name, Try(args(3).toInt).toOption.getOrElse(defaultMonitorTickInterval), monitor)))
    }

    val root = system.actorOf(TraverseActor.props(new File(args(0)), refs))

    implicit val askTimeout = Timeout(timeoutInS seconds)

    var isTimedOut = false

    while(!isTimedOut){
      Thread.sleep(30000)
      val fut = (shutdown ? ShutDownYet).mapTo[Boolean]
      isTimedOut = Await.result(fut, timeoutInS seconds)
    }

    refs.foreach{ x =>
      val fw = new BufferedWriter(new FileWriter(new File(x._1), true))
      fw.write("{}\n]")
      fw.close()
    }

    system.terminate
  }

}

MonitorActor.scala

import MonitorActor.ShutDown
import akka.actor.{Actor, ActorRef, Props, ReceiveTimeout, Stash}
import io.circe.syntax._

import scala.concurrent.duration._

class MonitorActor(shutdownActor: ActorRef, timeoutInS: Int) extends Actor with Stash {

  context.setReceiveTimeout(timeoutInS seconds)

  override def receive: Receive = {
    case ReceiveTimeout =>
      shutdownActor ! ShutDown
  }

}

object MonitorActor {
  def props(shutdownActor: ActorRef, timeoutInS: Int) = Props(new MonitorActor(shutdownActor, timeoutInS))

  case object ShutDown
}

PrintActor.scala

import java.io.{BufferedWriter, File, FileWriter, PrintWriter}

import akka.actor.{Actor, ActorRef, Props, Stash}
import PrintActor.{Count, HeartBeat}

class PrintActor(name: String, interval: Int, monitorActor: ActorRef) extends Actor with Stash {

  val file = new File(name)

  override def preStart = {
    val fw = new BufferedWriter(new FileWriter(file, true))
    fw.write("[\n")
    fw.close()

    self ! Count(0)
  }

  override def receive: Receive = {
    case Count(c) =>
      context.become(withCount(c))
      unstashAll()

    case _ =>
      stash()
  }

  def withCount(c: Int): Receive = {
    case s: String =>
      val fw = new BufferedWriter(new FileWriter(file, true))
      fw.write(s)
      fw.write(",\n")
      fw.close()

      if (c == interval) {
        monitorActor ! HeartBeat
        context.become(withCount(0))
      } else {
        context.become(withCount(c+1))
      }
  }

}

object PrintActor {
  def props(name: String, interval: Int, monitorActor: ActorRef) = Props(new PrintActor(name, interval, monitorActor))

  case class Count(count: Int)

  case object HeartBeat
}

ShutDownActor.scala

import MonitorActor.ShutDown
import ShutDownActor.ShutDownYet
import akka.actor.{Actor, Props, Stash}

class ShutDownActor() extends Actor with Stash {

  override def receive: Receive = {
    case ShutDownYet => sender ! false
    case ShutDown => context.become(canShutDown())
  }

  def canShutDown(): Receive = {
    case ShutDownYet => sender ! true
  }

}

object ShutDownActor {
  def props = Props(new ShutDownActor())

  case object ShutDownYet
}

TraverseActor.scala

import java.io.File

import akka.actor.{Actor, ActorRef, PoisonPill, Props, ReceiveTimeout}
import io.circe.syntax._

import scala.collection.JavaConversions
import scala.concurrent.duration._
import scala.util.Try

class TraverseActor(start: File, printers: IndexedSeq[(String, ActorRef)]) extends Actor{

  val hash = start.hashCode()
  val mod = hash % printers.size
  val idx = if (mod < 0) -mod else mod
  val myPrinter = printers(idx)._2

  override def preStart = {
    self ! start
  }

  override def receive: Receive = {
    case f: File =>
      val path = f.getCanonicalPath
      val files = Option(f.list()).map(_.toIndexedSeq.map(x =>new File(path + "/" + x)))

      val directories = files.map(_.filter(_.isDirectory))

      directories.foreach(ds => processDirectories(ds))

      val entities = files.map{fs =>
        fs.map{ f =>
          val path = f.getCanonicalPath
          val owner = Try(java.nio.file.Files.getOwner(f.toPath).toString).toOption.getOrElse("")
          val permissions = Try(java.nio.file.Files.getPosixFilePermissions(f.toPath).toString).toOption.getOrElse("")
          val attributes = Try(java.nio.file.Files.readAttributes(f.toPath, "lastModifiedTime,creationTime,lastAccessTime"))
          val lastModifiedTime = attributes.flatMap(a => Try(a.get("lastModifiedTime").toString)).toOption.getOrElse("")
          val creationTime = attributes.flatMap(a => Try(a.get("creationTime").toString)).toOption.getOrElse("")
          val lastAccessTime = attributes.flatMap(a => Try(a.get("lastAccessTime").toString)).toOption.getOrElse("")

          if (f.isDirectory) FileEntity(path, owner, permissions, lastModifiedTime, creationTime, lastAccessTime)
          else DirectoryEntity(path, owner, permissions, lastModifiedTime, creationTime, lastAccessTime)
        }
      }

      directories match {
        case Some(seq) =>
          seq match {
            case x+:xs =>
            case IndexedSeq() => self ! PoisonPill
          }
        case None => self ! PoisonPill
      }

      entities.foreach(e => myPrinter ! Contents(f.getCanonicalPath, e).asJson.toString)
  }

  def processDirectories(directories: IndexedSeq[File]): Unit = {
    def inner(fs: IndexedSeq[File]): Unit = {
      fs match {
        case x +: xs =>
          context.actorOf(TraverseActor.props(x, printers))
          processDirectories(xs)
        case IndexedSeq() =>
      }

    }

    directories match {
      case x +: xs =>
        self ! x
        inner(xs)
      case IndexedSeq() =>
    }
  }

}

object TraverseActor {
  def props(start: File, printers: IndexedSeq[(String, ActorRef)]) = Props(new TraverseActor(start, printers))
}

I only tested on a small example, so it is possible this program will run into problems when running your job. If that happens, feel free to ask questions.

Not the answer you're looking for? Browse other questions tagged or ask your own question.