2

The documentation for kafka say the following:

Replicas that are still fetching messages from leaders but did not catch up to the latest messages in replica.lag.time.max.ms will be considered out of sync.

I am not sure what this mean exactly.

  1. Replica need to be behind by 0 message once each replica.lag.time.max.ms to be considered in sync
  2. Or latest message fetched by replica should not be older than replica.lag.time.max.ms

Those 2 definition are not the same thing because if it mean #2 its possible for a replica to be always 2 or 3 message behind but still stay in sync as long as it does not drift by more than replica.lag.time.max.ms.

But if it mean #1 the replica need to consume strictly faster than the data arrive.

2 Answers 2

3

It's number 2. Replicas are in sync if there is no data older than the lag time on the leader that hasn't been replicated. Please do open a jira if you feel the wording should be updated because that's an easy one to update :)

0

I think it's closer to #1, but not exactly. I paste some source code to help you. The version of source code is 1.0.2.

Replica get out of sync by Partition.getOutOfSyncReplicas(leaderReplica: Replica, maxLagMs: Long) :

def getOutOfSyncReplicas(leaderReplica: Replica, maxLagMs: Long): Set[Replica] = {
/**
 * there are two cases that will be handled here -
 * 1. Stuck followers: If the leo of the replica hasn't been updated for maxLagMs ms,
 *                     the follower is stuck and should be removed from the ISR
 * 2. Slow followers: If the replica has not read up to the leo within the last maxLagMs ms,
 *                    then the follower is lagging and should be removed from the ISR
 * Both these cases are handled by checking the lastCaughtUpTimeMs which represents
 * the last time when the replica was fully caught up. If either of the above conditions
 * is violated, that replica is considered to be out of sync
 *
 **/
val candidateReplicas = inSyncReplicas - leaderReplica

val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs)
if (laggingReplicas.nonEmpty)
  debug("Lagging replicas are %s".format(laggingReplicas.map(_.brokerId).mkString(",")))

laggingReplicas

}

Replica.lastCaughtUpTimeMs is update by Replica.updateLogReadResult(logReadResult: LogReadResult) :

/**
* If the FetchRequest reads up to the log end offset of the leader when the current fetch request is received,
* set `lastCaughtUpTimeMs` to the time when the current fetch request was received.
*
* Else if the FetchRequest reads up to the log end offset of the leader when the previous fetch request was received,
* set `lastCaughtUpTimeMs` to the time when the previous fetch request was received.
*
* This is needed to enforce the semantics of ISR, i.e. a replica is in ISR if and only if it lags behind leader's LEO
* by at most `replicaLagTimeMaxMs`. These semantics allow a follower to be added to the ISR even if the offset of its
* fetch request is always smaller than the leader's LEO, which can happen if small produce requests are received at
* high frequency.
**/
def updateLogReadResult(logReadResult: LogReadResult) {
if (logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.leaderLogEndOffset)
  _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, logReadResult.fetchTimeMs)
else if (logReadResult.info.fetchOffsetMetadata.messageOffset >= lastFetchLeaderLogEndOffset)
  _lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, lastFetchTimeMs)

logStartOffset = logReadResult.followerLogStartOffset
logEndOffset = logReadResult.info.fetchOffsetMetadata
lastFetchLeaderLogEndOffset = logReadResult.leaderLogEndOffset
lastFetchTimeMs = logReadResult.fetchTimeMs
}

Not the answer you're looking for? Browse other questions tagged or ask your own question.