0

I am having problems with named pipes. When say 30 client pipes are all trying to connect at the same time, to a local pipe server, on a 4 core machine, I get a timeout or semaphore timeout. It sometimes, for the longest time, takes one second for one client to get the connection. Then one more second for the next and so on. I thought local pipe access was supposed to be fast. Why would 30 clients - even 100 clients takes the same time - take 1000 milliseconds to make only one connection?

using System;
using System.Diagnostics;
using System.IO.Pipes;
using System.Security.AccessControl;
using System.Threading;
using System.Threading.Tasks;

namespace PipeStress
{
    class Program
    {
        public static PipeSecurity CreatePipeSecurity()
        {
            PipeSecurity ps;
            using (var seedPipe = new NamedPipeServerStream("{DDAB520F-5104-48D1-AA65-7AEF568E0045}",
                PipeDirection.InOut, 1, PipeTransmissionMode.Message, PipeOptions.None, 1000, 1000))
            {
                ps = seedPipe.GetAccessControl();
            }

            var sid = new System.Security.Principal.SecurityIdentifier(
                System.Security.Principal.WellKnownSidType.BuiltinUsersSid, null);

            ps.AddAccessRule(new PipeAccessRule(sid, PipeAccessRights.ReadWrite |
                PipeAccessRights.CreateNewInstance | PipeAccessRights.ChangePermissions,
                AccessControlType.Allow));

            sid = new System.Security.Principal.SecurityIdentifier(
                System.Security.Principal.WellKnownSidType.LocalServiceSid, null);

            ps.AddAccessRule(new PipeAccessRule(sid, PipeAccessRights.ReadWrite,
                AccessControlType.Allow));

            return ps;
        }
        static void Main(string[] args)
        {
            Task.Run(() => RunPipeServer());

            for (var i = (uint) 0; i < 30; i++)
            {
                var index = i;
                //Thread.Sleep(100);
                Task.Run(() => RunPipeClient(index));
            }

            Console.ReadLine();
        }

        private const string PipeName = "{6FDABBF8-BFFD-4624-A67B-3211ED7EF0DC}";

        static void RunPipeServer()
        {
            try
            {
                var stw = new Stopwatch();

                while (true)
                {
                    stw.Restart();

                    var pipeServer = new NamedPipeServerStream(PipeName, PipeDirection.InOut,
                         NamedPipeServerStream.MaxAllowedServerInstances,
                         PipeTransmissionMode.Message, PipeOptions.Asynchronous, 4 * 1024, 4 * 1024,
                         CreatePipeSecurity());
                    try
                    {
                        var pipe = pipeServer;
                        pipeServer.WaitForConnection();
                        Console.WriteLine(stw.ElapsedMilliseconds + "ms");


                        Task.Run(() => HandleClient(pipe));
                    }
                    catch (Exception ex)
                    {
                        pipeServer.Dispose();
                    }
                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
        }

        private static void HandleClient(NamedPipeServerStream pipeServer)
        {
            try
            {
                try
                {
                    //Thread.Sleep(100);
                }
                finally
                {
                    pipeServer.Close();
                }
            }
            finally
            {
                pipeServer.Dispose();
            }
        }

        static void RunPipeClient(uint i)
        {
            try
            {
                var j = 0;

                while (true)
                {

                    using (var pipeClient = new NamedPipeClientStream(".", PipeName, PipeDirection.InOut, PipeOptions.None))
                    {
                        //Thread.Sleep(100);

                        pipeClient.Connect(5000);
                        try
                        {
                            Console.WriteLine($"{i}, {++j}");
                            pipeClient.ReadByte();
                        }
                        finally
                        {
                            pipeClient.Close();
                        }
                    }


                }
            }
            catch (Exception ex)
            {
                Console.WriteLine(ex);
            }
        }
    }
}

1 Answer 1

3

When adding load to any server, some latency is to be expected. However, in your example, the latency occurs at exactly 1 second intervals, which is both excessive and strikingly orderly. The orderliness is a very big clue as to what's going on. :)

In fact, the latency you are seeing is due to the delay built into the thread pool for the creation of new threads. Another part of the evidence is the fact that, actually, the first few operations complete immediately. The latency only starts happening after that, which coincides exactly with the thread pool having run out of threads, and waiting for the thread pool's throttling to allow a new thread to be created to service the request. This throttling limits creation of new threads to, surprise! :), one per second.

One way to address this is to increase the minimum number of threads in the thread pool, so you have all the threads you need right away. This can be done by calling ThreadPool.SetMinThreads(). But really, dedicating a thread pool thread to each client (and to the server, for that matter) is wasteful. It's better to use the API asynchronously, and let .NET manage the I/O. Thread pool threads are still used, but only when actually needed, i.e. when the I/O operation actually completes, and only to process that completion. You'll require fewer threads in the first place, and the thread pool will reach equilibrium sooner as demand for threads increases.

Here is a version of your code illustrating how you might do this (I removed the CreatePipeSecurity() method altogether, as it does not appear to be in any way related to the issue you're asking about):

    static void Main(string[] args)
    {
        CancellationTokenSource tokenSource = new CancellationTokenSource();
        List<Task> tasks = new List<Task>();

        tasks.Add(RunPipeServer(tokenSource.Token));

        for (var i = (uint)0; i < 30; i++)
        {
            var index = i;
            tasks.Add(RunPipeClient(index, tokenSource.Token));
        }

        Console.ReadLine();
        tokenSource.Cancel();

        Task.WaitAll(tasks.ToArray());
    }

    private const string PipeName = "{6FDABBF8-BFFD-4624-A67B-3211ED7EF0DC}";

    static async Task RunPipeServer(CancellationToken token)
    {
        try
        {
            var stw = new Stopwatch();
            int clientCount = 0;

            while (!token.IsCancellationRequested)
            {
                stw.Restart();

                var pipeServer = new NamedPipeServerStream(PipeName, PipeDirection.InOut,
                     NamedPipeServerStream.MaxAllowedServerInstances,
                     PipeTransmissionMode.Message, PipeOptions.Asynchronous);
                try
                {
                    token.Register(() => pipeServer.Close());
                    await Task.Factory.FromAsync(pipeServer.BeginWaitForConnection, pipeServer.EndWaitForConnection, null);
                    clientCount++;
                    Console.WriteLine($"server connection #{clientCount}. {stw.ElapsedMilliseconds} ms");

                    HandleClient(pipeServer);
                }
                catch (Exception ex)
                {
                    Console.WriteLine("RunPipeServer exception: " + ex.Message);
                    pipeServer.Dispose();
                }
            }
        }
        catch (Exception ex)
        {
            Console.WriteLine("RunPipeServer exception: " + ex.Message);
            Console.WriteLine(ex);
        }
    }

    // Left this method synchronous, as in your example it does almost nothing
    // in this example. You might want to make this "async Task..." as well, if
    // you wind up having this method do anything interesting.
    private static void HandleClient(NamedPipeServerStream pipeServer)
    {
        pipeServer.Close();
    }

    static async Task RunPipeClient(uint i, CancellationToken token)
    {
        try
        {
            var j = 0;

            while (!token.IsCancellationRequested)
            {
                using (var pipeClient = new NamedPipeClientStream(".", PipeName, PipeDirection.InOut, PipeOptions.None))
                {
                    pipeClient.Connect(5000);
                    try
                    {
                        Console.WriteLine($"connected client {i}, connection #{++j}");
                        await pipeClient.ReadAsync(new byte[1], 0, 1);
                    }
                    finally
                    {
                        pipeClient.Close();
                    }
                }
            }

            Console.WriteLine($"client {i} exiting normally");
        }
        catch (Exception ex)
        {
            Console.WriteLine($"RunPipeClient({i}) exception: {ex.Message}");
        }
    }
1
  • Thank you very much for the response. It has been very helpful and enlightening.
    – PieterB
    Commented Mar 20, 2017 at 17:37

Not the answer you're looking for? Browse other questions tagged or ask your own question.