Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deadlock #2

Closed
mh-cbon opened this issue Mar 11, 2021 · 23 comments
Closed

deadlock #2

mh-cbon opened this issue Mar 11, 2021 · 23 comments
Assignees
Labels
bug Something isn't working enhancement New feature or request good first issue Good for newcomers question Further information is requested

Comments

@mh-cbon
Copy link

mh-cbon commented Mar 11, 2021

hey,

I believe there is a deadlock in your code.

I was running benchmarks to measure and compare your code with mine.

I was hit with

$ go test -bench=. -benchmem
goos: linux
goarch: amd64
pkg: test/concur
BenchmarkMe-4     	    4653	    234443 ns/op	   10216 B/op	     205 allocs/op
BenchmarkOrig-4   	fatal error: all goroutines are asleep - deadlock!

goroutine 1 [chan receive]:
testing.(*B).doBench(0xc000126480, 0x0, 0x0, 0x0, 0x0, 0x0, 0x0)
	/home/mh-cbon/.gvm/gos/go1.15.2/src/testing/benchmark.go:277 +0x73
testing.(*benchContext).processBench(0xc00000c0e0, 0xc000126480)
	/home/mh-cbon/.gvm/gos/go1.15.2/src/testing/benchmark.go:570 +0x218
testing.(*B).run(0xc000126480)
	/home/mh-cbon/.gvm/gos/go1.15.2/src/testing/benchmark.go:268 +0x65
testing.(*B).Run(0xc000126000, 0x548ce9, 0xd, 0x551778, 0x4c4300)
	/home/mh-cbon/.gvm/gos/go1.15.2/src/testing/benchmark.go:655 +0x41b
testing.runBenchmarks.func1(0xc000126000)
	/home/mh-cbon/.gvm/gos/go1.15.2/src/testing/benchmark.go:534 +0x78
testing.(*B).runN(0xc000126000, 0x1)
	/home/mh-cbon/.gvm/gos/go1.15.2/src/testing/benchmark.go:191 +0xeb
testing.runBenchmarks(0x548814, 0xb, 0xc00000c0c0, 0x623060, 0x2, 0x2, 0x62aa60)
	/home/mh-cbon/.gvm/gos/go1.15.2/src/testing/benchmark.go:540 +0x3b5
testing.(*M).Run(0xc000114000, 0x0)
	/home/mh-cbon/.gvm/gos/go1.15.2/src/testing/testing.go:1363 +0x56a
main.main()
	_testmain.go:45 +0x138

goroutine 66550 [chan receive]:
test/concur.runOrig()
	/home/mh-cbon/gow/src/test/concur/main_test.go:59 +0x195
test/concur.BenchmarkOrig(0xc000126480)
	/home/mh-cbon/gow/src/test/concur/main_test.go:73 +0x2b
testing.(*B).runN(0xc000126480, 0x64)
	/home/mh-cbon/.gvm/gos/go1.15.2/src/testing/benchmark.go:191 +0xeb
testing.(*B).launch(0xc000126480)
	/home/mh-cbon/.gvm/gos/go1.15.2/src/testing/benchmark.go:321 +0xea
created by testing.(*B).doBench
	/home/mh-cbon/.gvm/gos/go1.15.2/src/testing/benchmark.go:276 +0x55

goroutine 66622 [chan receive]:
test/concur.runOrig.func1(0xc0002b2060, 0xc0000284e0)
	/home/mh-cbon/gow/src/test/concur/main_test.go:44 +0x45
created by test/concur.runOrig
	/home/mh-cbon/gow/src/test/concur/main_test.go:41 +0xdc

goroutine 66621 [chan send]:
test/concur/orig.Process.func1(0xc000016160, 0xc0002b2060, 0x551790, 0xc0001d0018)
	/home/mh-cbon/gow/src/test/concur/orig/lib.go:112 +0x285
created by test/concur/orig.Process
	/home/mh-cbon/gow/src/test/concur/orig/lib.go:36 +0xaf

goroutine 66610 [select (no cases)]:
test/concur/orig.Process.func1.1(0xc0001d0010, 0xc0002b2000, 0xc0000283c0)
	/home/mh-cbon/gow/src/test/concur/orig/lib.go:52 +0x2cf
created by test/concur/orig.Process.func1
	/home/mh-cbon/gow/src/test/concur/orig/lib.go:47 +0x14c

goroutine 66549 [chan send]:
test/concur.runOrig.func1(0xc00013c000, 0xc000028240)
	/home/mh-cbon/gow/src/test/concur/main_test.go:49 +0x66
created by test/concur.runOrig
	/home/mh-cbon/gow/src/test/concur/main_test.go:41 +0xdc

goroutine 66623 [chan receive]:
test/concur/orig.Process.func1.1(0xc0001d0020, 0xc0002b2060, 0xc000028600)
	/home/mh-cbon/gow/src/test/concur/orig/lib.go:52 +0x1c5
created by test/concur/orig.Process.func1
	/home/mh-cbon/gow/src/test/concur/orig/lib.go:47 +0x14c

goroutine 66609 [chan send]:
test/concur.runOrig.func1(0xc0002b2000, 0xc0000282a0)
	/home/mh-cbon/gow/src/test/concur/main_test.go:49 +0x66
created by test/concur.runOrig
	/home/mh-cbon/gow/src/test/concur/main_test.go:41 +0xdc

goroutine 66574 [select (no cases)]:
test/concur/orig.Process.func1.1(0xc0001d0000, 0xc00013c000, 0xc0001ce0c0)
	/home/mh-cbon/gow/src/test/concur/orig/lib.go:52 +0x2cf
created by test/concur/orig.Process.func1
	/home/mh-cbon/gow/src/test/concur/orig/lib.go:47 +0x14c
exit status 2
FAIL	test/concur	2.887s

My test code is

package main

import (
	"math/rand"
	"test/concur/me"
	"test/concur/orig"
	"testing"
	"time"
)

var max = 100
var poolSize = 10
var outputLen = 10

var _ = time.Millisecond
var _ = rand.Intn

func runMe() {
	input := make(chan func() interface{})
	output := me.Process(input, me.Options{PoolSize: poolSize, OutChannelBuffer: outputLen})
	go func() {
		for work := 0; work < max; work++ {
			value := work
			input <- func() interface{} {
				// time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
				return value * 2
			}
		}
		close(input)
	}()
	for val := range output {
		// fmt.Println(val)
		_ = val
	}
}

func runOrig() {
	inputChan := make(chan *orig.OrderedInput)
	doneChan := make(chan bool)
	outChan := orig.Process(inputChan, workFn, &orig.Options{PoolSize: poolSize, OutChannelBuffer: outputLen})
	go func() {
		for {
			select {
			case out, chok := <-outChan:
				if chok {
					// log.Println(out.Value)
					_ = out
				} else {
					doneChan <- true
				}
			}
		}
	}()
	for work := 0; work < max; work++ {
		input := &orig.OrderedInput{Value: work}
		inputChan <- input
	}
	close(inputChan)
	<-doneChan
}
func workFn(val interface{}) interface{} {
	// time.Sleep(time.Millisecond * time.Duration(rand.Intn(1000)))
	return val.(int)
}

func BenchmarkMe(b *testing.B) {
	for i := 0; i < b.N; i++ {
		runMe()
	}
}
func BenchmarkOrig(b *testing.B) {
	for i := 0; i < b.N; i++ {
		runOrig()
	}
}

As you can see runOrig is very much like the main you have demonstrated at stackoverflow.

consider i commented sleep and print instructions to improve the measurements, they were polluting.

The package orig is a copy paste of https://github.com/tejzpr/ordered-concurrently/blob/master/main.go in my local, i only renamed the package name for my tests.

Though, i did not try to debug it, it lacks clarity imho, and that is the reason i did that test to begin with.

I also tried to run the test with the race detector, bad news, it appears your code also contains a datarace

$ go test -race -bench=. -benchmem
goos: linux
goarch: amd64
pkg: test/concur
BenchmarkMe-4     	     382	   3095423 ns/op	    8830 B/op	     204 allocs/op
BenchmarkOrig-4   	==================
WARNING: DATA RACE
Read at 0x00c0003fe020 by goroutine 84:
  test/concur/orig.Process.func1.2()
      /home/mh-cbon/gow/src/test/concur/orig/lib.go:87 +0x138

Previous write at 0x00c0003fe020 by goroutine 49:
  test/concur/orig.Process.func1.1()
      /home/mh-cbon/gow/src/test/concur/orig/lib.go:69 +0x3b2

Goroutine 84 (running) created at:
  test/concur/orig.Process.func1()
      /home/mh-cbon/gow/src/test/concur/orig/lib.go:82 +0x234

Goroutine 49 (running) created at:
  test/concur/orig.Process.func1()
      /home/mh-cbon/gow/src/test/concur/orig/lib.go:47 +0x1ba
==================
==================
WARNING: DATA RACE
Read at 0x00c000018108 by goroutine 34:
  internal/race.Read()
      /home/mh-cbon/.gvm/gos/go1.15.2/src/internal/race/race.go:37 +0x206
  sync.(*WaitGroup).Add()
      /home/mh-cbon/.gvm/gos/go1.15.2/src/sync/waitgroup.go:71 +0x219
  test/concur/orig.Process.func1.2()
      /home/mh-cbon/gow/src/test/concur/orig/lib.go:84 +0x84

Previous write at 0x00c000018108 by goroutine 109:
  internal/race.Write()
      /home/mh-cbon/.gvm/gos/go1.15.2/src/internal/race/race.go:41 +0x125
  sync.(*WaitGroup).Wait()
      /home/mh-cbon/.gvm/gos/go1.15.2/src/sync/waitgroup.go:128 +0x126
  test/concur/orig.Process.func1.2()
      /home/mh-cbon/gow/src/test/concur/orig/lib.go:90 +0x186

Goroutine 34 (running) created at:
  test/concur/orig.Process.func1()
      /home/mh-cbon/gow/src/test/concur/orig/lib.go:82 +0x234

Goroutine 109 (running) created at:
  test/concur/orig.Process.func1()
      /home/mh-cbon/gow/src/test/concur/orig/lib.go:82 +0x234
==================
^Csignal: interrupt
FAIL	test/concur	172.097s

Maybe that has to do with the fact i commented the sleep instructions, idk.

for completeness,

@tejzpr
Copy link
Owner

tejzpr commented Mar 12, 2021

Thanks for this, I did indeed test with load only, the deadlock occurs when there is no load. The safer implementation would be to use Workgroups. I have updated the Readme and the Stack overflow answer.

@tejzpr
Copy link
Owner

tejzpr commented Mar 12, 2021

Closing the issue, since deadlock is not occurring with the updated implementation.

@tejzpr tejzpr closed this as completed Mar 12, 2021
@tejzpr tejzpr reopened this Mar 12, 2021
@tejzpr
Copy link
Owner

tejzpr commented Mar 12, 2021

@mh-cbon We can continue the discussion here. I am unable to reproduce the Race conditions on my machine with the current test cases. I have also added the scenario that caused the deadlock to the test cases, it is passing without issues. I am unable to use your example code @ https://play.golang.org/p/2o4W_BgaC4t for this library because it is more resource intensive than the current implementation.

@mh-cbon
Copy link
Author

mh-cbon commented Mar 12, 2021

even if you cant reproduce the race on your side, reading at the stack trace, it is clear what are the problems here.

see

Read at 0x00c000018108 by goroutine 34:
...
  test/concur/orig.Process.func1.2()
      /home/mh-cbon/gow/src/test/concur/orig/lib.go:84 +0x84

Previous write at 0x00c000018108 by goroutine 109:
...
  test/concur/orig.Process.func1.2()
      /home/mh-cbon/gow/src/test/concur/orig/lib.go:90 +0x186

I removed useless stuff, it corresponds to https://github.com/tejzpr/ordered-concurrently/blob/master/main.go#L90 and https://github.com/tejzpr/ordered-concurrently/blob/master/main.go#L84

If you check the documentation, https://golang.org/pkg/sync/#WaitGroup If a WaitGroup is reused to wait for several independent sets of events, new Add calls must happen after all previous Wait calls have returned.
This is a clear warning about what to care about when using waitgroups.

For the second race, see,

Read at 0x00c0003fe020 by goroutine 84:
  test/concur/orig.Process.func1.2()
      /home/mh-cbon/gow/src/test/concur/orig/lib.go:87 +0x138

Previous write at 0x00c0003fe020 by goroutine 49:
  test/concur/orig.Process.func1.1()
      /home/mh-cbon/gow/src/test/concur/orig/lib.go:69 +0x3b2

https://github.com/tejzpr/ordered-concurrently/blob/master/main.go#L87
https://github.com/tejzpr/ordered-concurrently/blob/master/main.go#L69

two routines are modifying a shared value without synchonization. data race.

Concerning the deadlock, this is tighly related to your logic, to me it is obvious it has problems, hough to prove it i should fully understand it and tbh i was pretty lazy on that aspect of the work.
So anyways, by experience i kew another, cleaner, more idiomatic solution existed.

... it is more resource intensive than the current implementation.

The question is can you prove it ? That basically the question i was about to answer when i hit those issues.

@tejzpr
Copy link
Owner

tejzpr commented Mar 12, 2021

Thanks for the ref, I'll try to fix the code to and remove the race conditions.
Regarding your code, I see there are sort calls in process, which I would like to avoid. If you would like to optimize the implementation you are welcome to create a PR.

@mh-cbon
Copy link
Author

mh-cbon commented Mar 12, 2021

cpus are fast, memories are slow, most of the time. Consider that using a map is allocating on the heap. Plus i am not entirely sure it does free up the underlying data, it might be more subtle than that golang/go#20135
In any case a simple win in your implementation is to pre allocate that map to give it an initial capacity greater than 0, outputMap := make(map[uint64]*processInput, 20) or outputMap := make(map[uint64]*processInput, 0, 20). not sure, but the later seems weird.

@tejzpr
Copy link
Owner

tejzpr commented Mar 12, 2021

The initial requirement that I put in while implementing this package was that the input size to the worker function is unknown (e.g. Input from a kafka queue) . Which puts predefining the map size out of the scope of the implementation.

@mh-cbon
Copy link
Author

mh-cbon commented Mar 12, 2021

you dont get it. It does not fix the size of the map. It pre allocates its element to prevent allocating on the fly for the first 20 items. Its a big win in many scenarios. As it is a limitless worker it is even more important to care about that aspect of the implementation as maps does not shrin automatically after a call to delete. anyways. maybe its good enough like this..https://www.youtube.com/watch?v=rLSFy23HAPQ

@tejzpr
Copy link
Owner

tejzpr commented Mar 12, 2021

I'll take your recommendations for the map size and update the code for the next version.

@tejzpr
Copy link
Owner

tejzpr commented Mar 12, 2021

I have fixed the possible race conditions / deadlocks that you helped to identify with this commit a8addfe Thank you! 😊
Please do review it, your feedback is much appreciated.

@tejzpr tejzpr self-assigned this Mar 13, 2021
@mh-cbon
Copy link
Author

mh-cbon commented Mar 13, 2021

neat, i have been able to complete my testing, thanks.

$ go test -race -bench=. -benchmem
goos: linux
goarch: amd64
pkg: test/concur
BenchmarkMe-4     	      84	  14206521 ns/op	   60227 B/op	    1976 allocs/op
BenchmarkOrig-4   	      74	  17267334 ns/op	   77919 B/op	    4501 allocs/op
PASS
ok  	test/concur	3.258s
@tejzpr
Copy link
Owner

tejzpr commented Mar 13, 2021

Closing, changes released in v1.0.12

@tejzpr tejzpr closed this as completed Mar 13, 2021
@mh-cbon
Copy link
Author

mh-cbon commented Mar 13, 2021

this test suite is even more intersting, it executes only one op, but it does it over a big max value.
https://play.golang.org/p/vHVEtzXsil9

$ go test -bench=. -benchmem
goos: linux
goarch: amd64
pkg: test/concur
BenchmarkMe-4     	      73	  14600672 ns/op	 1246658 B/op	   31568 allocs/op
BenchmarkOrig-4   	       1	1277338144 ns/op	80040736 B/op	 4999532 allocs/op
PASS
ok  	test/concur	15.660s
@mh-cbon
Copy link
Author

mh-cbon commented Mar 13, 2021

to wrap this up and after some more digging it is possible to improve further the code

$ go test -bench=. -benchmem 
goos: linux
goarch: amd64
pkg: test/concur
BenchmarkMeManyOps-4     	    4918	    225078 ns/op	    5765 B/op	      45 allocs/op
BenchmarkMe2ManyOps-4    	    4902	    234386 ns/op	   10203 B/op	     205 allocs/op
BenchmarkOrigManyOps-4   	    4677	    265062 ns/op	    7911 B/op	     313 allocs/op
BenchmarkMeOneOp-4       	       1	2288020932 ns/op	50869008 B/op	 2606393 allocs/op
BenchmarkMe2OneOp-4      	       1	2425049742 ns/op	99596640 B/op	 3005669 allocs/op
BenchmarkOrigOneOp-4     	       1	2531154559 ns/op	80024624 B/op	 4999531 allocs/op
PASS
ok  	test/concur	10.856s

The modified lib is https://play.golang.org/p/QKNVE3XEfrF and the test is https://play.golang.org/p/5nwbDiUF6q8

It does avoid to allocate when pushing to the input chan. Previsouly a function was created for each handler, in the new version is takes advantage of struct and interface to reduces allocations.
The internal channels are unbuffered, i noticed it allocates less, and it is also faster, but this is not grounded on science, just facts, it might be subject to changes if the runtime is modified.

@tejzpr
Copy link
Owner

tejzpr commented Mar 13, 2021

Reopening this again :)
I did try out your implementation also created another implementation using golang's container/heap both are available here
My Go version is 1.16.2
Your implementation: https://github.com/tejzpr/ordered-concurrently/tree/workfunction-implementation-mh-cbon

go test -bench=. -benchmem 
goos: darwin
goarch: amd64
pkg: github.com/tejzpr/ordered-concurrently
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkOC-12                 1        1141522718 ns/op        112458480 B/op   4008232 allocs/op
PASS
ok      github.com/tejzpr/ordered-concurrently  1.852s

Implementation using heap: https://github.com/tejzpr/ordered-concurrently/tree/heap-workfunction-implementation

╰─ go test -bench=. -benchmem 
goos: darwin
goarch: amd64
pkg: github.com/tejzpr/ordered-concurrently
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkOC-12                 1        1195411900 ns/op        85212040 B/op    2999656 allocs/op
PASS
ok      github.com/tejzpr/ordered-concurrently  1.950s

Map based implementation: https://github.com/tejzpr/ordered-concurrently/tree/workfunction-implementation

go test -bench=. -benchmem 
goos: darwin
goarch: amd64
pkg: github.com/tejzpr/ordered-concurrently
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkOC-12                 1        1248661758 ns/op        40010344 B/op    2999549 allocs/op
PASS
ok      github.com/tejzpr/ordered-concurrently  1.953s

I am not sure why I am unable to reproduce your numbers, it might be due to OS specific optimizations.

I have also tried the reduced allocation runner with and it does reduce allocations significantly.

Reduce allocs : https://github.com/tejzpr/ordered-concurrently/tree/workfunction-implementation-allocs

╰─ go test -bench=. -benchmem 
goos: darwin
goarch: amd64
pkg: github.com/tejzpr/ordered-concurrently
cpu: Intel(R) Core(TM) i7-9750H CPU @ 2.60GHz
BenchmarkOC-12                 1        1207667776 ns/op        16017816 B/op    1999698 allocs/op
PASS
ok      github.com/tejzpr/ordered-concurrently  1.348s
@tejzpr tejzpr reopened this Mar 13, 2021
@mh-cbon
Copy link
Author

mh-cbon commented Mar 13, 2021

Intersting results

$ go test -bench=. -benchmem -count=10
goos: linux
goarch: amd64
pkg: test/concur
BenchmarkMeManyOps-4     	   12090	     96156 ns/op	    5836 B/op	      47 allocs/op
BenchmarkMeManyOps-4     	   12486	     98514 ns/op	    5842 B/op	      47 allocs/op
BenchmarkMeManyOps-4     	   12499	     96615 ns/op	    5837 B/op	      47 allocs/op
BenchmarkMeManyOps-4     	   12446	     96074 ns/op	    5847 B/op	      48 allocs/op
BenchmarkMeManyOps-4     	   12403	     96309 ns/op	    5841 B/op	      47 allocs/op
BenchmarkMeManyOps-4     	   12441	     96426 ns/op	    5834 B/op	      47 allocs/op
BenchmarkMeManyOps-4     	   12414	     96561 ns/op	    5833 B/op	      47 allocs/op
BenchmarkMeManyOps-4     	   12442	     96650 ns/op	    5840 B/op	      47 allocs/op
BenchmarkMeManyOps-4     	   12308	     96545 ns/op	    5840 B/op	      47 allocs/op
BenchmarkMeManyOps-4     	   12412	     96901 ns/op	    5829 B/op	      47 allocs/op
BenchmarkMe2ManyOps-4    	   10000	    117951 ns/op	   10315 B/op	     205 allocs/op
BenchmarkMe2ManyOps-4    	    9439	    121084 ns/op	   10309 B/op	     205 allocs/op
BenchmarkMe2ManyOps-4    	   10000	    120271 ns/op	   10309 B/op	     205 allocs/op
BenchmarkMe2ManyOps-4    	    9280	    120965 ns/op	   10314 B/op	     205 allocs/op
BenchmarkMe2ManyOps-4    	    9322	    120287 ns/op	   10306 B/op	     205 allocs/op
BenchmarkMe2ManyOps-4    	   10000	    120144 ns/op	   10307 B/op	     205 allocs/op
BenchmarkMe2ManyOps-4    	    9708	    121319 ns/op	   10315 B/op	     205 allocs/op
BenchmarkMe2ManyOps-4    	    9799	    120065 ns/op	   10307 B/op	     205 allocs/op
BenchmarkMe2ManyOps-4    	   10000	    121023 ns/op	   10316 B/op	     205 allocs/op
BenchmarkMe2ManyOps-4    	    9664	    121331 ns/op	   10308 B/op	     205 allocs/op
BenchmarkHeapManyOps-4   	   10000	    103815 ns/op	    5018 B/op	     120 allocs/op
BenchmarkHeapManyOps-4   	   10000	    103754 ns/op	    5019 B/op	     120 allocs/op
BenchmarkHeapManyOps-4   	   10000	    104277 ns/op	    5015 B/op	     120 allocs/op
BenchmarkHeapManyOps-4   	   10000	    104197 ns/op	    5017 B/op	     120 allocs/op
BenchmarkHeapManyOps-4   	   10000	    103538 ns/op	    5020 B/op	     120 allocs/op
BenchmarkHeapManyOps-4   	   10000	    103812 ns/op	    5018 B/op	     120 allocs/op
BenchmarkHeapManyOps-4   	   10000	    104004 ns/op	    5019 B/op	     120 allocs/op
BenchmarkHeapManyOps-4   	   10000	    103530 ns/op	    5018 B/op	     120 allocs/op
BenchmarkHeapManyOps-4   	   10000	    104144 ns/op	    5019 B/op	     120 allocs/op
BenchmarkHeapManyOps-4   	   10000	    102792 ns/op	    5017 B/op	     120 allocs/op
BenchmarkOrigManyOps-4   	    9349	    138729 ns/op	    7914 B/op	     312 allocs/op
BenchmarkOrigManyOps-4   	    8592	    141700 ns/op	    7908 B/op	     313 allocs/op
BenchmarkOrigManyOps-4   	    8732	    141449 ns/op	    7926 B/op	     313 allocs/op
BenchmarkOrigManyOps-4   	    8811	    142396 ns/op	    7910 B/op	     313 allocs/op
BenchmarkOrigManyOps-4   	    8714	    140649 ns/op	    7922 B/op	     313 allocs/op
BenchmarkOrigManyOps-4   	    8412	    137935 ns/op	    7934 B/op	     312 allocs/op
BenchmarkOrigManyOps-4   	    8437	    137770 ns/op	    7949 B/op	     313 allocs/op
BenchmarkOrigManyOps-4   	    8326	    141114 ns/op	    7876 B/op	     313 allocs/op
BenchmarkOrigManyOps-4   	    8488	    142828 ns/op	    7965 B/op	     313 allocs/op
BenchmarkOrigManyOps-4   	    8368	    142383 ns/op	    7875 B/op	     313 allocs/op
BenchmarkMeOneOp-4       	       1	1217634077 ns/op	48937840 B/op	 2576560 allocs/op
BenchmarkMeOneOp-4       	       1	1066079091 ns/op	44409056 B/op	 2498899 allocs/op
BenchmarkMeOneOp-4       	       1	1131739344 ns/op	46993072 B/op	 2542803 allocs/op
BenchmarkMeOneOp-4       	       1	1105588553 ns/op	43514304 B/op	 2483984 allocs/op
BenchmarkMeOneOp-4       	       1	1159698387 ns/op	45305008 B/op	 2514523 allocs/op
BenchmarkMeOneOp-4       	       1	1102728723 ns/op	42747808 B/op	 2470007 allocs/op
BenchmarkMeOneOp-4       	       1	1133125928 ns/op	43897024 B/op	 2489221 allocs/op
BenchmarkMeOneOp-4       	       1	1114841835 ns/op	43744976 B/op	 2486851 allocs/op
BenchmarkMeOneOp-4       	       1	1158303452 ns/op	45692864 B/op	 2521166 allocs/op
BenchmarkMeOneOp-4       	       1	1086512877 ns/op	43203328 B/op	 2478737 allocs/op
BenchmarkMe2OneOp-4      	       1	1224063003 ns/op	101528448 B/op	 3003246 allocs/op
BenchmarkMe2OneOp-4      	       1	1235739961 ns/op	101255728 B/op	 3003141 allocs/op
BenchmarkMe2OneOp-4      	       1	1236977805 ns/op	100975568 B/op	 3003254 allocs/op
BenchmarkMe2OneOp-4      	       1	1241012212 ns/op	101244640 B/op	 3003316 allocs/op
BenchmarkMe2OneOp-4      	       1	1236305621 ns/op	101306912 B/op	 3003787 allocs/op
BenchmarkMe2OneOp-4      	       1	1247136974 ns/op	101222880 B/op	 3002974 allocs/op
BenchmarkMe2OneOp-4      	       1	1229842563 ns/op	101232688 B/op	 3003340 allocs/op
BenchmarkMe2OneOp-4      	       1	1236317217 ns/op	101181968 B/op	 3003569 allocs/op
BenchmarkMe2OneOp-4      	       1	1230941024 ns/op	101325520 B/op	 3003681 allocs/op
BenchmarkMe2OneOp-4      	       1	1227969884 ns/op	101243664 B/op	 3004011 allocs/op
BenchmarkHeapOneOp-4     	       1	1119069052 ns/op	69189688 B/op	 1999934 allocs/op
BenchmarkHeapOneOp-4     	       1	1104706084 ns/op	69188728 B/op	 1999924 allocs/op
BenchmarkHeapOneOp-4     	       1	1113601005 ns/op	69188728 B/op	 1999924 allocs/op
BenchmarkHeapOneOp-4     	       1	1111160432 ns/op	69188712 B/op	 1999924 allocs/op
BenchmarkHeapOneOp-4     	       1	1113942712 ns/op	69190648 B/op	 1999929 allocs/op
BenchmarkHeapOneOp-4     	       1	1114091624 ns/op	69189000 B/op	 1999927 allocs/op
BenchmarkHeapOneOp-4     	       1	1117396249 ns/op	69188712 B/op	 1999924 allocs/op
BenchmarkHeapOneOp-4     	       1	1110596276 ns/op	69188712 B/op	 1999924 allocs/op
BenchmarkHeapOneOp-4     	       1	1108004644 ns/op	69188744 B/op	 1999924 allocs/op
BenchmarkHeapOneOp-4     	       1	1113978825 ns/op	69188728 B/op	 1999924 allocs/op
BenchmarkOrigOneOp-4     	       1	1325677542 ns/op	80017440 B/op	 4999508 allocs/op
BenchmarkOrigOneOp-4     	       1	1327616653 ns/op	80017856 B/op	 4999511 allocs/op
BenchmarkOrigOneOp-4     	       1	1332561131 ns/op	80018000 B/op	 4999512 allocs/op
BenchmarkOrigOneOp-4     	       1	1342211905 ns/op	80008048 B/op	 4999513 allocs/op
BenchmarkOrigOneOp-4     	       1	1333226969 ns/op	80007328 B/op	 4999508 allocs/op
BenchmarkOrigOneOp-4     	       1	1333065161 ns/op	80041792 B/op	 4999536 allocs/op
BenchmarkOrigOneOp-4     	       1	1335215898 ns/op	80018128 B/op	 4999513 allocs/op
BenchmarkOrigOneOp-4     	       1	1335089527 ns/op	80018288 B/op	 4999514 allocs/op
BenchmarkOrigOneOp-4     	       1	1329021838 ns/op	80017696 B/op	 4999510 allocs/op
BenchmarkOrigOneOp-4     	       1	1496456811 ns/op	80171008 B/op	 4999582 allocs/op
PASS
ok  	test/concur	109.888s

The heap version is slightly slowest in all cases, and allocate 5x more in ManyOps case, but with one op and a big number of value it allocates less, 500K allocations are removed, it is significant. Though, i can not really explain it.

@mh-cbon
Copy link
Author

mh-cbon commented Mar 13, 2021

on a pure code review aspect, i dont understand what does this channel https://github.com/tejzpr/ordered-concurrently/blob/workfunction-implementation/main.go#L104

This definition is problematic, to me: https://github.com/tejzpr/ordered-concurrently/blob/workfunction-implementation-mh-cbon/main.go#L9
Reason is hvng a struct containing a null func member forces the caller to instantiatee a function, allocate. This is the last optimizations i added, the process takes in a chan of Producers (type Producer interface { Produce() interface{} }), thus the caller can instantiate struct value implementing that interface.

Because of map not shrinking automatically, this version must include a control of the map capacity in order to re create it the differrence with len exceeds some ratio.
https://github.com/tejzpr/ordered-concurrently/blob/workfunction-implementation-allocs/main.go#L50
Otherwise, with enough time and usage, you will end up in oom.

While using an Heap seems a good idea, in practice it behaves badly imho. Maybe it is possible to generate an heap implentation of []processInput, which, i believe, will greatly improve its performance. Though, i must say that using a slice is simple, efficient, with a good behavior of memory reuse, and idiomatic.

@tejzpr
Copy link
Owner

tejzpr commented Mar 13, 2021

The channel at https://github.com/tejzpr/ordered-concurrently/blob/workfunction-implementation/main.go#L104 is an artifact from a previous implementation, i'll remove it in v2.

Already removed it in the Workerfunc interface implementation, still the benchmark difference doesn't seem large enough.

I'll spend some time in the weekend and come to a conclusion.

@mh-cbon
Copy link
Author

mh-cbon commented Mar 13, 2021

still reviewing, i figured the test were not testing that the output was sorted as expected.

Here is what you could add

func TestSortedData(t *testing.T) {
	t.Run("Test without workgroup", func(t *testing.T) {
		max := 10
		inputChan := make(chan Producer)
		output := Process(inputChan, &Options{PoolSize: 10, OutChannelBuffer: 10})
		go func() {
			for work := 0; work < max; work++ {
				inputChan <- producer(work)
			}
			close(inputChan)
		}()
		var res []int
		for out := range output {
			res = append(res, out.(int))
		}
		if !sort.IsSorted(sort.IntSlice(res)) {
			t.Error("output is not sorted")
		}
		t.Log("Test without workgroup Completed")
	})
}
@mh-cbon
Copy link
Author

mh-cbon commented Mar 13, 2021

you can also make use of this error t.Errorf("Invalid output %T", out) (i had some problems with this : / )

@mh-cbon
Copy link
Author

mh-cbon commented Mar 13, 2021

For the benchmark, i work two ways, over many instances of the job to complete, over one jpb with zillion entries to see the behavior over a long run maybe not such a good idea.

func BenchmarkOC(b *testing.B) {
	max := 1000000
	inputChan := make(chan Producer)
	output := Process(inputChan, &Options{PoolSize: 10, OutChannelBuffer: 10})
	go func() {
		for work := 0; work < max; work++ {
			inputChan <- producer(work)
		}
		close(inputChan)
	}()
	for out := range output {
		_ = out
	}
}

func BenchmarkOC2(b *testing.B) {
	for i := 0; i < b.N; i++ {
		max := 100
		inputChan := make(chan Producer)
		output := Process(inputChan, &Options{PoolSize: 10, OutChannelBuffer: 10})
		go func() {
			for work := 0; work < max; work++ {
				inputChan <- producer(work)
			}
			close(inputChan)
		}()
		for out := range output {
			_ = out
		}
	}
}

I sligthly updated my version you have checked in, https://play.golang.org/p/YSouN6gLm4H and its tests https://play.golang.org/p/WW0AK9nJNTG

the results are similar here

$ go test -bench=. -benchmem
goos: linux
goarch: amd64
pkg: github.com/tejzpr/ordered-concurrently
BenchmarkOC-4    	       1	1070002333 ns/op	80066160 B/op	 3238932 allocs/op
BenchmarkOC2-4   	   12121	     98117 ns/op	    5890 B/op	      49 allocs/op
PASS
ok  	github.com/tejzpr/ordered-concurrently	3.813s
@mh-cbon
Copy link
Author

mh-cbon commented Mar 13, 2021

you can check that version with a specialized heap implementation

not a nice api, added complexity, but it does the job

test https://play.golang.org/p/H09V9f2DvU2

main https://play.golang.org/p/ZE4pLLMo69O

heap https://play.golang.org/p/vkdGmHKIeIC

It is a neat improvement

$ go test -bench=. -benchmem
goos: linux
goarch: amd64
pkg: github.com/tejzpr/ordered-concurrently
BenchmarkOC-4    	       2	 954328398 ns/op	16032504 B/op	 1999642 allocs/op
BenchmarkOC2-4   	   12920	     93354 ns/op	     404 B/op	       5 allocs/op
PASS
ok  	github.com/tejzpr/ordered-concurrently	5.568s
@tejzpr
Copy link
Owner

tejzpr commented Mar 14, 2021

Released v2.0.0 I am favoring the heap implementation over the map implementation because of better memory utilization for larger / long running use cases of the library due to golang/go#20135.

@tejzpr tejzpr closed this as completed Mar 14, 2021
@tejzpr tejzpr added the good first issue Good for newcomers label Mar 14, 2021
@tejzpr tejzpr added bug Something isn't working enhancement New feature or request question Further information is requested labels Mar 14, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working enhancement New feature or request good first issue Good for newcomers question Further information is requested
2 participants