7

We have this architecture:

queue -> message processor (horizontal scaling) -> RDBMS

Sometimes external systems dump 10k messages onto the queue and the message processor of course dutifully chews through them and dumps them to the db. But the db can't write as fast as the processor can read/transform. Which leads to high CPU usage on the db, which leads to all sorts of problems.

We have a few options:

  1. Slow down the message processor. I don't like this because it sets a hard limit to how quickly the system can process messages.

  2. Scale up the db. Workable, but it's a fixed cost to what is an intermittent problem.

  3. Fix the amount of horizontal scaling the MP can do, e.g. you may scale, but never more than 3 instances. This limits total throughput during low-load times.

  4. Set the number of retries on messages to something very high, like 100. Currently at 6.

I wonder if there's a 5th option: ask the db how swamped it is, hold off on attempting to write until the db is less busy (or put the message back on the queue if a set time passes and the db is still busy). I'm ideally looking for something that can adapt to pressure on the db writer instead of setting values ahead of time and hoping it works. With the less adaptable architecture we end up with messages in the DLQ and that requires manual intervention.

Assume for the sake of argument we can't/don't need to change indexes in the database. The queries and indexes have no more optimization to squeeze. The scaling on the db is not something that can be automated.

Has anyone done anything like this?

9
  • "Assume for the sake of argument we can't/don't need to change indexes in the database." Not ignoring this but as a side note, switching to a reverse index(es) can greatly improve insert performance. There are significant limitations to using them, though.
    – JimmyJames
    Commented Jun 7 at 13:43
  • @JimmyJames, is this what you are talking about? - en.wikipedia.org/wiki/Reverse_index Commented Jun 7 at 15:06
  • @GregBurghardt Yes. They can help when you have situations when you end up inserting a lot of keys that are close together in a short period e.g. when using a sequential pk.
    – JimmyJames
    Commented Jun 7 at 15:14
  • I'd never heard of reverse indexes so that is valuable info.
    – jcollum
    Commented Jun 7 at 15:32
  • 2
    @jcollum You can't do range lookups or sort with them so keep that in mind. As an example, I had a DBA put reverse indexes on microsecond timestamps which was completely pointless because no one was looking up data based on the exact microsecond the row was created.
    – JimmyJames
    Commented Jun 7 at 16:01

5 Answers 5

6

You're on the right track with having the system adapt to current conditions. If a database is already busy, asking it how busy it is just adds one more thing for it to do. Instead, flip the responsibility around. Have the message processors track how long each message takes to process. Once the average duration reaches a certain threshold, have the message processor pause briefly between messages. If the average processing time isn't decreasing, pause a little longer. Once the average comes back down, reduce the pause in between messages.

Basilevs brings up a good point in one of his comments. In this case, more horizontal scaling won't fix anything. Adding more instances will just push more load downstream, so you'll want to cap the number of instances to a sensible number. You will need to balance maximizing throughput when the system is running fast with pulling back on the reigns to slow the horse down when the system runs slow.

This reactive adaptation relies on slowing things down, which you don't want, but there isn't another way. The advantage here is building in the means to detect that messages can be processed faster, which eventually eliminates the pause in between messages.

You don't need to rely strictly on average processing times. You could consider tracking processing times during certain windows of time each day and basing the pause time on the standard deviation. The point is to come up with some measurable metric on the message processing side so the message processor can judge for itself how long is should wait between messages, up to whether it needs to wait at all.

I feel like there should be a buzzword or technique you can search for related to this. My memory is failing me at the moment, but "throttling" or "adaptive throttling" comes to mind. The message processor throttles itself based on environmental factors outside its control.

A properly balanced throttling mechanism should avoid most situations where the database is unresponsive. You might not be able to avoid all of those situations, though, so including a retry mechanism is probably advisable too. You might not prevent all messages needing human intervention, so those processes should remain in place, but the goal is to dramatically reduce them.

Implementing a throttling mechanism isn't the end to this issue. Be careful not to simply push this problem downstream. Clearly, you have heavy traffic during certain times of day. I would suggest calculating how many messages get processed during these periods of time, and increase the message queue size to accommodate the surge. If you cannot support the necessary throughput, you at least want to handle the flood gracefully without dropping messages.


Response to some comments:

I think the load of asking the db what the current CPU usage is at is pretty minimal, no? When it's once per message?

The database is a separate system. Checking CPU usage won't help if the database is I/O bound, or memory bound. Don't rely on root causes here. Your service cannot fix the CPU usage or memory usage for the database. Your service should be monitoring for symptoms, not root causes. The solution for a database that is memory bound is to slow down processing — the same solution for high CPU usage. Don't make assumptions about the root cause. Just respond to symptoms, because your service cannot kill processes, or free up memory on the database.

Plus, any reliance on external systems during message processing is going to poison the data for avg processing time. We sometimes have to hit an API during message processing.

All the more reason to monitor something that is agnostic to a root cause. So maybe the database is running fast, but now some other service is getting hammered. What's the measurable symptom here? Time spent processing each message. What's the solution for this other service running slow? Throttle your service.

20
  • 1
    "Adaptive throttling" is the correct term, IBM seems to have a patent for it. This is exactly the kind of thing why I don't like software patents.
    – Doc Brown
    Commented Jun 7 at 11:53
  • 1
    I feel like there should be plenty of prior art for "don't do things as fast when something is busy" but alas, I'm not a lawyer willing to fight the battle... or a lawyer. Commented Jun 7 at 11:56
  • Personally, I think the level of inventiveness for this idea was way-too-low (even at the time when the patent was filed), but the id***s in the patent offices all over the world seem to authorise any crap.
    – Doc Brown
    Commented Jun 7 at 12:25
  • @DocBrown Agreed. People think that patents protect the 'little guy' but in reality, they are artillery shells that large companies lob at each other and/or use to bleed small companies dry in court defending against them. It doesn't matter that much if they are valid or not, it still costs a lot to defend against them. I thought there was some improvement in the US on this some years bac, though.
    – JimmyJames
    Commented Jun 7 at 13:37
  • We have heavy traffic at somewhat random times. The traffic level is generally stable, but once in a while an upstream will just dump a few thousand messages on our queue to process.
    – jcollum
    Commented Jun 7 at 15:29
3

There are a lot of factors here that aren't really detailed here but based on what you have written, there are some potential options that could be fairly simple to implement. I'm going to ignore option #2 based on the assumed constraints near the end of your question.

Your first option is probably the simplest to understand but not necessarily the easiest to put in place. If the only load on the DB that you need to worry about is coming from this process, it could be an optimal solution. In simple terms, you would find the highest rate the DB can tolerate and cap it at that. It won't slow your application down under normal load assuming that your DB can handle normal load. This is sort of the basic concept behind queuing. You build a system that can handle your average load and buffer when incoming load is faster than that (or some multiple of the average.) Your system must be able to handle average load at the very least, or it will fall behind over time (by definition.)

Option #3 is essentially a form of #1 that is straightforward to implement. You find the maximum number of producers your DB can handle and run that many. If the incoming volume exceeds what they can handle, the queue will grow until the spike is over. Again, this should not limit anything when volume is low because the, in general, non-spike volume must be lower than the average volume. I'm assuming your system can handle average volume (over some reasonable period) or your solution is inherently undersized.

Your 4th option is interesting because it implies that transactions are failing. That would suggest the load far exceeds the capacity of the DB. I don't like the solution you suggest because it could actually make things worse, but it might play into a decent approach. That is, instead of increasing the retries, when these transactions fail, you add 'backoff' throttle. That is, instead of asking the DB how busy it is, you treat these errors as an indication that the DB is telling you it can't handle the load you have already provided. When a producer encounters an error like this, you have it wait/sleep for a short period before trying again. On retry, if the error occurs again, you sleep again. You might want to sleep longer on each successive failure for the same message (e.g. doubling the wait each time.) This creates a sort of automatic feedback loop which dynamically reduces load when the DB is overwhelmed. It's not the prettiest solution but very simple to put in place.

This kind of dynamic approach is useful if the DB capacity to handle these transactions is not constant or nearly constant. That is, if you have other kinds of workloads on the DB, there's no one maximum capacity that you can pick.

Another option that you might want to consider is keeping track of the total number of outstanding inserts i.e. requests submitted but not complete. This is really just a refinement of the option of limiting producers. For example, if you have 5 producers, they might spend half their time preparing inserts and the other half doing the inserts and waiting. In that situation you might want to limit the number of concurrent inserts to 3. The idea is that if the DB gets bogged down, you will end up with more producers waiting and you can preemptively reduce the load until the DB has gotten through its current work. This solution requires some sort of coordination between producers which can present some challenges on its own, though. The big advantage of the error-based solution discussed earlier is that it doesn't need any sort of explicit shared semaphore.

As far as interrogating the DB goes, it's definitely possible but I don't think it's necessarily as straightforward as just checking the CPU. RDBMS performance is not always a CPU-bound. It can be caused by all kinds of different types of contention including but not limited to locks, memory usage, and disk access. You can get these kinds of stats but there's no standard interface for it like SQL and may not be available in real time. I think trying to do this will be far more difficult than a 'self-centering' solution as mentioned above.

7
  • 1
    A more complex variation of the backoff would be rather than using it determine a sleep, feed it into a rate limiter. Everytime you encounter an error decrease the allowed rate. For every success increase it, but by a lesser extent. So rather than just delaying the current message, you also hold back the rate at which later messages are processed. (This is the concept used by TCP congestion control) Commented Jun 7 at 18:14
  • @user1937198 Yes, there are many ways to refine the basic concept that I've explained here. My advice would generally be to start with the simplest thing that works. The potential consequence of lowering the rate explicitly is that you could end up with a lowered processing rate for too long after the bottleneck is cleared. Not insurmountable, just potentially a little tricky.
    – JimmyJames
    Commented Jun 7 at 18:33
  • Note, that with limited number of MPs, no backoff is needed, as the number of inserts is limited. In other words, Option 3 is superior to all other suggestions.
    – Basilevs
    Commented Jun 7 at 19:23
  • @Basilevs I think you are assuming there are no other workloads on the DB. The OP doesn't really specify whether that's a factor but that's typically the kind of thing that I have had to deal with.
    – JimmyJames
    Commented Jun 7 at 19:52
  • @Basilevs To give an example that's a different but parallel type of situation, I was pulled into some troubleshooting not too long ago around a system. Skipping over a lot of details, the relevant part was that the hosts were getting overwhelmed and spending a lot of time in GC pauses. Once in this state, they really needed to be left alone in order to recover and get back to their normal processing capacity.
    – JimmyJames
    Commented Jun 7 at 20:34
3

If there is a highly variable workload, then the system needs to be highly variable. The processor(s) are variable, but the DB is not.

queue -> message processor (horizontal scaling) -> RDBMS (vertical scaling)

Several cloud providers offer vertical scaling for the database instance. This allows to scale up during busy times and scale down during nominal usage. One static instance can only do so much.

In a prior life, we had a similar setup (queues and database) and the database was the bottleneck when it was a static instance size. The database would routinely max out processor and memory. It was bad, users couldn't even log in because the database was running very hot. So, we vertically auto scaled the database instance. This allowed for more throughput and adapted better in high volume situations. It also helped us avoid purchasing a rather large sized DB when we needed it only ~20% of the time. Through performance testing, we tried to keep our max processor usage around 60-80% memory/CPU as end users were also using the system as well as the background processing that was happening. So, we always get some resource percentage in reserve. This allowed us to set a high-water mark (max instance) for the processor instances so they wouldn't overrun the DB even in its highest instance form. It's a balancing act. We would max out the vertical instance sometimes at various points throughout the day. At night when the system was nominal it would scale back down to smaller or even the smallest instance. When scaling up one gets a bigger machine with more processing horsepower and memory. I would recommend that approach since the load is variable if possible.

Another thing I'll mention is switching from a messaging architecture to more of an ETL based one. Instead of 10K messages, your system receives 1 file with 10000 items in it and then it is bulk loaded into the database. This will be much more efficient and time saving than processing individual messages. If this is possible, I would advise that. Take the volume of data changes into account and batch/bulk load them. If you are getting 10 of 1000s of updates, load them in batch instead of 1 at a time.

Also, if these messages can wait, consider scheduling the messages during off hours. The messages will sit in the queue for a bit longer but there should be the maximum amount of database resources available, and the run time would be more predictable.

1
  • Good point about switching to ETL vs messages. We're looking into that.
    – jcollum
    Commented Jun 12 at 15:48
2

batch size

A client's perceived database performance is characterized by

  • latency (seconds to COMMIT), and
  • throughput (rows per second)

Often we can trade one against the other. You were a bit vague on details of your "RDBMS" and on what the limiting bottleneck resource was, but we can safely assume that clients are seeing both measures impacted during busy times. Our goal is to do better on at least one measure. We will focus on a single-threaded message processor, with no parallelism. Assume it has a local .CSV file with ten thousand rows that it wants to shovel into a table.

In most relational databases, throughput will suffer if we go for minimum app-level latency. So for a processor whose while loop repeatedly does

  • read one CSV line
  • INSERT one row
  • COMMIT

we will typically see low latency and poor throughput, perhaps on the order of a hundred rows per second, less if there's WAN round trips slowing things down. Even with a local RDBMS, we're waiting on latency of the I/O subsystem to report it has persisted a row, plus some per-row processing overhead.

That was a batch size of 1. If we choose a larger batch size K, we can send K rows per COMMIT, amortizing some of the overhead, taking advantage of TCP WAN bandwidth to a distant DB server so round trip latencies are less interesting, and most importantly letting the I/O subsystem do more useful work per request. Benchmark your local database and see! Choosing K near 10 or 1000 often will put you near the sweet spot, maximizing throughput with inserting at least a thousand rows per second and typically more than an order of magnitude better than that.

tl;dr: INSERT multiple rows per COMMIT for greater efficiency, leading to higher throughput and shorter task completion time.

The tradeoff, of course, is that inserting K rows takes longer than for one row, so latency to persist that first row will be inflated.

fairness

On a busy database system you have various competing users and needs. To avoid having INSERT traffic unduly impact interactive queries, consider telling the message processor to unconditionally delay D seconds -- sleep(d); -- after COMMIT'ing each batch of records. This guarantees that some amount of DB resource is available for other uses. During times of high message ingest rate it will naturally lead to larger per-COMMIT batches, arguably a good thing.

benchmark

I urge you to take out a stopwatch, generate synthetic load with different values of K, and verify my timing claims against the RDBMS you're using. Do it on a lightly loaded system, perhaps around midnight, for clean timing figures. Then verify at noon that peak-time performance follows a similar trend.

Consider publishing such timing observations as a separate Answer post.

temp table

The OP was rather vague on what the bottleneck resource is. Sometimes we have lots of hardware resource, in the form of {CPU, I/O bandwidth, network bandwidth, memory}, and are bottlenecked on lock contention. That is, clients all need to acquire a mutex to access some common table, perhaps because they are INSERTing subject to some UNIQUE index constraint.

If that matches your situation, the message processor may want to CREATE a new table for its exclusive use, and INSERT lots of rows into it at some high rate, committing them at the end. With that accomplished, we're still faced with a "database to database" transfer of rows SELECTed from the temp table and INSERTed at their final destination. When lock contention is limiting forward progress, this two-step decoupling can sometimes improve overall throughput, in part because the query planner can "see into the future" by inspecting statistics on the temp table. Definitely get out your stopwatch and verify, if you go this route.

For technical reasons related to what can happen during ROLLBACK, most relational databases can TRUNCATE (or DROP) such a temp table much more quickly than when using a blind DELETE to remove all rows.

csv import

Some database vendors support directly reading a local .CSV file. If that option is available to you, get out your stopwatch and see if that is preferrable to ten thousand INSERT statements.

4
  • K could be dynamic to include more and more messages as load increases trading latency for throughput. Why is there an assumption that throughput would suffer on larger K?
    – Basilevs
    Commented Jun 23 at 9:14
  • @Basilevs, I'm just going with observed timings. As K increases from 1 it's easy to understand why throughput increases, as now we're able to fill all of a 4-KiB disk block with useful data. The effect is quite pronounced. Continuing to increase K, we're trying to ensure that certain queues never empty, so there's always some useful work to do. But as K continues to increase, there's no further benefit, and we actually start needlessly triggering cache evictions. So the workload no longer fits in L1, L2, or L3. In extreme cases we can even induce paging behavior.
    – J_H
    Commented Jun 24 at 4:19
  • I don't understand how more smaller batches would cause less cache evictions than fewer large batches. Indeed, I would expect less CPU cache evictions due to linearity of processing. Could you add a section about your observations?
    – Basilevs
    Commented Jun 24 at 9:11
  • Not sure about paging - I've never worked with heavily paged requests, but seems more plausible. Thanks!
    – Basilevs
    Commented Jun 24 at 9:13
0
Disclaimer: I've never horizontally scaled producers with an ACID consumer. The described approach is proven to work well with object databases.

I suggest to use Nagle's algorithm. It combines performance of large batches at heavy loads with great latency under small loads.

The idea is to keep a pool of processed messages, and store them all in a single batch. Store operations should be executed one at a time.

If database is slow, it will take time to store a batch and MP would accumulate more messages in the next batch, increasing throughput due to batch size. If transaction fails, the message from batch are added back to the queue, increasing throughput due to batch size for the next attempt.

There are two variations of this approach:

Global queue

Introduce a separate queue in your message broker for processed messages and consume them from a dedicated single-threaded service (preferably hosted next to the DB).

Pros

  • Largest batch size
  • A single spot to fine-tune DB performance (like introducing a maximum batch size, if latency becomes unbearable or DB hits paging like @J_H warns)
  • Clear horizontal scaling policy - allocate a new MP when input queue is large irrespective of DB load

Cons

  • Latency of unrelated messages
  • A extra trip to message broker

Local queue

Accumulate processed messages on MPs.

Pros

  • Good lock locality on properly sharded MPs (when no two MPs are working on same data)
  • Potentially increased throughput

Cons

  • Failing transactions are distributed, increasing network load.
  • Scaling requires polling MP's for their queue size (do not allocate a new instance if any queue is large)
3
  • We are already using a queue in front of the message processor. This answer seems to have missed that. No?
    – jcollum
    Commented Jun 24 at 16:10
  • @jcollum queues are only needed before bottlenecks. If your message processor are infinitely scalable, they don't have to have a queue before them (a load balancer would be enough). As DB is another bottleneck, you need another queue immediately before it.
    – Basilevs
    Commented Jun 24 at 17:40
  • I'm not saying that using a queue for load balancing is wrong - just that load balancing does not require queues and can be done cheaper (for example, round-robin LB gives very good results without any storage requirements)
    – Basilevs
    Commented Jun 24 at 17:45

Not the answer you're looking for? Browse other questions tagged or ask your own question.