2

I have a pipeline written in Go that I want to execute with Spark runner, the Spark Standalone is installed on my local machine.

  • Apache Beam 2.56.0

  • Apache Spark 3.2.2

I started Spark master and worker from the installation dir with this commands.

# for master
./sbin/start-master.sh -h localhost

# for worker
./sbin/start-worker.sh spark://localhost:7077

Then I started the beam_spark3_job_server and mounted /tmp

docker run -v /tmp:/tmp --net=host apache/beam_spark3_job_server:2.56.0 \
        --spark-master-url=spark://localhost:7077

Now, from the Go project running

go run main.go --runner PortableRunner \
        --endpoint localhost:8099 \
        --environment_type LOOPBACK

works fine, but the environment_type is set as LOOPBACK.
So if I want to remove it and run the script again without it (by default it's set to DOCKER)

go run main.go --runner PortableRunner \
        --endpoint localhost:8099

with that I get this on the console

java.lang.IllegalStateException: No container running for id xxxxx

Though it's different than in this thread No container running for id xxxxxx when running apache beam go sdk examples, because using the docker run command with -v solved the issue with not finding the file in the /tmp/beam-artifact-staging.

Nonetheless the issue still persists.
These are some fragments of logs from Spark

24/07/02 15:21:37 DEBUG DockerEnvironmentFactory: Creating Docker Container with ID 1-1
24/07/02 15:21:39 DEBUG DockerEnvironmentFactory: Created Docker Container with Container ID 1464722e38b1eba5d50a3f4a7f8036c7ae03d524abcb3f0a5e868a73f5b51fc4
24/07/02 15:21:39 INFO GrpcLoggingService: Beam Fn Logging client connected.
24/07/02 15:21:39 DEBUG : Initializing Go harness: /opt/apache/beam/boot --id=1-1 --provision_endpoint=localhost:39005
24/07/02 15:21:39 DEBUG LocalFileSystem: opening file /tmp/beam-artifact-staging/9b228b83e120b5aa87f4ce34788bacdf1c35d2f05311deb8efb494bfbea0ff0b/1-0:go-/tmp/worker-1-1719926483620669554
24/07/02 15:21:40 WARN GrpcLoggingService: Logging client failed unexpectedly.
org.apache.beam.vendor.grpc.v1p60p1.io.grpc.StatusRuntimeException: CANCELLED: client cancelled
    at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Status.asRuntimeException(Status.java:529)
    at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.stub.ServerCalls$StreamingServerCallHandler$StreamingServerCallListener.onCancel(ServerCalls.java:291)
    at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.PartialForwardingServerCallListener.onCancel(PartialForwardingServerCallListener.java:40)
    at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:23)
    at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onCancel(ForwardingServerCallListener.java:40)
    at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.Contexts$ContextualizedServerCallListener.onCancel(Contexts.java:96)
    at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closedInternal(ServerCallImpl.java:370)
    at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.closed(ServerCallImpl.java:359)
    at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1Closed.runInContext(ServerImpl.java:910)
    at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at org.apache.beam.vendor.grpc.v1p60p1.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
24/07/02 15:21:42 DEBUG AwsRegionProviderChain: Unable to load region from software.amazon.awssdk.regions.providers.InstanceProfileRegionProvider@247a4d99:Unable to contact EC2 metadata service.
24/07/02 15:21:42 DEBUG LocalDiskShuffleMapOutputWriter: Writing shuffle index file for mapId 1 with length 8
24/07/02 15:21:42 DEBUG IndexShuffleBlockResolver: Shuffle index for mapId 1: [0,0,0,0,0,0,0,0]
24/07/02 15:21:42 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 6019 bytes result sent to driver
24/07/02 15:21:42 DEBUG ExecutorMetricsPoller: stageTCMP: (0, 0) -> 1
24/07/02 15:21:44 INFO DockerEnvironmentFactory: Still waiting for startup of environment apache/beam_go_sdk:2.56.0 for worker id 1-1
24/07/02 15:21:44 ERROR DockerEnvironmentFactory: Docker container 1464722e38b1eba5d50a3f4a7f8036c7ae03d524abcb3f0a5e868a73f5b51fc4 logs:
2024/07/02 13:21:39 Provision info:
pipeline_options:{fields:{key:"beam:option:app_name:v1"  value:{string_value:"go-job-1-1719926483620667092"}}  fields:{key:"beam:option:experiments:v1"  value:{list_value:{values:{string_value:"beam_fn_api"}}}}  fields:{key:"beam:option:go_options:v1"  value:{struct_value:{fields:{key:"options"  value:{struct_value:{fields:{key:"endpoint"  value:{string_value:"localhost:8099"}}  fields:{key:"hookOrder"  value:{string_value:"[\"default_remote_logging\"]"}}  fields:{key:"hooks"  value:{string_value:"{\"default_remote_logging\":null}"}}  fields:{key:"job"  value:{string_value:"wordcount"}}  fields:{key:"runner"  value:{string_value:"spark"}}}}}}}}  fields:{key:"beam:option:job_name:v1"  value:{string_value:"go0job0101719926483620667092-root-0702132126-ff3f12ba"}}  fields:{key:"beam:option:options_id:v1"  value:{number_value:2}}  fields:{key:"beam:option:parallelism:v1"  value:{number_value:-1}}  fields:{key:"beam:option:retain_docker_containers:v1"  value:{bool_value:false}}  fields:{key:"beam:option:runner:v1"  value:{null_value:NULL_VALUE}}  fields:{key:"beam:option:spark_master:v1"  value:{string_value:"spark://localhost:7077"}}}  retrieval_token:"go-job-1-1719926483620667092_8d8b0d53-0d18-49dc-908b-a85d0be89cc5"  logging_endpoint:{url:"localhost:36449"}  artifact_endpoint:{url:"localhost:36373"}  control_endpoint:{url:"localhost:43091"}  dependencies:{type_urn:"beam:artifact:type:file:v1"  type_payload:"\n\x84\x01/tmp/beam-artifact-staging/9b228b83e120b5aa87f4ce34788bacdf1c35d2f05311deb8efb494bfbea0ff0b/1-0:go-/tmp/worker-1-1719926483620669554"  role_urn:"beam:artifact:role:go_worker_binary:v1"}  runner_capabilities:"beam:protocol:control_response_elements_embedding:v1"
2024/07/02 13:21:40 Downloaded: /tmp/staged/1-worker-1-1719926483620669554 (sha256: 89580cb558dbc92138c20bdb88f8687d7c96386e9f6d0b717b07b68fe9327476, size: 122860883)
24/07/02 15:21:44 ERROR Executor: Exception in task 7.0 in stage 0.0 (TID 7)
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.UncheckedExecutionException: java.lang.IllegalStateException: No container running for id 1464722e38b1eba5d50a3f4a7f8036c7ae03d524abcb3f0a5e868a73f5b51fc4
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2086)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.get(LocalCache.java:4012)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache.getOrLoad(LocalCache.java:4035)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.get(LocalCache.java:5013)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:5020)
    at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:458)
    at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.<init>(DefaultJobBundleFactory.java:443)
    at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:310)
    at org.apache.beam.runners.fnexecution.control.DefaultExecutableStageContext.getStageBundleFactory(DefaultExecutableStageContext.java:38)
    at org.apache.beam.runners.fnexecution.control.ReferenceCountingExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingExecutableStageContextFactory.java:207)
    at org.apache.beam.runners.spark.translation.SparkExecutableStageFunction.call(SparkExecutableStageFunction.java:142)
    at org.apache.beam.runners.spark.translation.SparkExecutableStageFunction.call(SparkExecutableStageFunction.java:81)
    at org.apache.spark.api.java.JavaRDDLike.$anonfun$mapPartitions$1(JavaRDDLike.scala:153)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:863)
    at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:863)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
    at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
    at org.apache.spark.scheduler.Task.run(Task.scala:131)
    at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.IllegalStateException: No container running for id 1464722e38b1eba5d50a3f4a7f8036c7ae03d524abcb3f0a5e868a73f5b51fc4
    at org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:137)
    at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:259)
    at org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:232)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3571)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2313)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2190)
    at org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2080)
    ... 39 more
    Suppressed: java.io.IOException: Received exit code 1 for command 'docker kill 1464722e38b1eba5d50a3f4a7f8036c7ae03d524abcb3f0a5e868a73f5b51fc4'. stderr: Error response from daemon: cannot kill container: 1464722e38b1eba5d50a3f4a7f8036c7ae03d524abcb3f0a5e868a73f5b51fc4: container 1464722e38b1eba5d50a3f4a7f8036c7ae03d524abcb3f0a5e868a73f5b51fc4 is not running
        at org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:255)
        at org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:181)
        at org.apache.beam.runners.fnexecution.environment.DockerCommand.killContainer(DockerCommand.java:161)
        at org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:161)
        ... 45 more

Most interesting lines being

24/07/02 15:21:44 INFO DockerEnvironmentFactory: Still waiting for startup of environment apache/beam_go_sdk:2.56.0 for worker id 1-1
24/07/02 15:21:44 ERROR DockerEnvironmentFactory: Docker container 1464722e38b1eba5d50a3f4a7f8036c7ae03d524abcb3f0a5e868a73f5b51fc4 logs:

1 Answer 1

0

Seems like the issue was with the way I structured the code in Go as well as the way of how I wrote it. Neither Beam, Spark nor Docker were the problem in this case.

Not the answer you're looking for? Browse other questions tagged or ask your own question.