I think it's closer to #1, but not exactly. I paste some source code to help you. The version of source code is 1.0.2.
Replica get out of sync by Partition.getOutOfSyncReplicas(leaderReplica: Replica, maxLagMs: Long) :
def getOutOfSyncReplicas(leaderReplica: Replica, maxLagMs: Long): Set[Replica] = {
/**
* there are two cases that will be handled here -
* 1. Stuck followers: If the leo of the replica hasn't been updated for maxLagMs ms,
* the follower is stuck and should be removed from the ISR
* 2. Slow followers: If the replica has not read up to the leo within the last maxLagMs ms,
* then the follower is lagging and should be removed from the ISR
* Both these cases are handled by checking the lastCaughtUpTimeMs which represents
* the last time when the replica was fully caught up. If either of the above conditions
* is violated, that replica is considered to be out of sync
*
**/
val candidateReplicas = inSyncReplicas - leaderReplica
val laggingReplicas = candidateReplicas.filter(r => (time.milliseconds - r.lastCaughtUpTimeMs) > maxLagMs)
if (laggingReplicas.nonEmpty)
debug("Lagging replicas are %s".format(laggingReplicas.map(_.brokerId).mkString(",")))
laggingReplicas
}
Replica.lastCaughtUpTimeMs is update by Replica.updateLogReadResult(logReadResult: LogReadResult) :
/**
* If the FetchRequest reads up to the log end offset of the leader when the current fetch request is received,
* set `lastCaughtUpTimeMs` to the time when the current fetch request was received.
*
* Else if the FetchRequest reads up to the log end offset of the leader when the previous fetch request was received,
* set `lastCaughtUpTimeMs` to the time when the previous fetch request was received.
*
* This is needed to enforce the semantics of ISR, i.e. a replica is in ISR if and only if it lags behind leader's LEO
* by at most `replicaLagTimeMaxMs`. These semantics allow a follower to be added to the ISR even if the offset of its
* fetch request is always smaller than the leader's LEO, which can happen if small produce requests are received at
* high frequency.
**/
def updateLogReadResult(logReadResult: LogReadResult) {
if (logReadResult.info.fetchOffsetMetadata.messageOffset >= logReadResult.leaderLogEndOffset)
_lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, logReadResult.fetchTimeMs)
else if (logReadResult.info.fetchOffsetMetadata.messageOffset >= lastFetchLeaderLogEndOffset)
_lastCaughtUpTimeMs = math.max(_lastCaughtUpTimeMs, lastFetchTimeMs)
logStartOffset = logReadResult.followerLogStartOffset
logEndOffset = logReadResult.info.fetchOffsetMetadata
lastFetchLeaderLogEndOffset = logReadResult.leaderLogEndOffset
lastFetchTimeMs = logReadResult.fetchTimeMs
}