Mutual Inclusion by Swift Concurrency
Synchronization refers to the coordination of multiple threads of execution.
When accessing a shared object, it involves mechanisms to manage mutual
exclusion—where only one thread can access a critical section at a time—and
mutual inclusion—where multiple threads can access resources concurrently when
safe to do so.
While using a mutex/lock makes it possible to provide exclusive access to a
shared mutable state/object, this mutual exclusion isn’t always ideal. In some
cases, multiple threads should be allowed to access the shared state as long
as some condition is valid.
In the following example, we have a Sendable
object that is supposed to be
accessed from multiple threads. While condition
is true, accessing count
should be possible. However, when it is false, other threads that want to
access count
should be blocked/suspended until the condition
is toggled
back to true.
final class State: Sendable {
var count: Int? {
condition ? _count : nil
}
// Compile-time error: Stored property 'condition' of 'Sendable'-conforming
// class 'State' is mutable
private var condition = true
private var _count = Int.random(in: 0...10)
}
Actors are great for protecting encapsulated states. Therefore, the first step
could be changing the State
object to be an actor; this should fix the
compile-time error.
But this is a simple scenario where no asynchronous work is being done within
the actor. In a more complicated case, we could be facing:
- Logic that sets the condition runs some asynchronous work.
- Instead of an optional integer, we want a required one. Instead of returning
nil
, we want the caller to be suspended until the condition
is true.
For instance, let’s say the State
contains the current market value of the
S&P 500 Index,
and we have an endpoint that returns the current market value. However, we
want to implement a rate-limiting mechanism to avoid server overload. In other
words, the value should be cached but refreshed every 60 seconds.
Let’s update our sample code by adding suspension and an async call for
updating the price:
actor State {
private var _price = 0
private var isOutdated: Bool { ContinuousClock.now - timestamp > .seconds(60) }
private var timestamp: ContinuousClock.Instant = .now
func price() async throws -> Int {
if isOutdated {
try? await refresh()
}
return _price
}
private func refresh() async throws {
_price = try await getCurrentMarketPrice()
timestamp = ContinuousClock.now
}
}
func getCurrentMarketPrice() async throws -> Int {
try await Task.sleep(nanoseconds: NSEC_PER_SEC * 2)
return (100...150).randomElement() ?? 0
}
The refresh
call invokes getCurrentMarketPrice
, which encapsulates the API
call to the backend. At first, we might think this is all we need. However,
there’s still a race condition due to actor reentrancy.
While one caller might be suspended for an await
call within the actor, the
actor isn’t in a deadlock state, and another caller can execute work. In our
scenario, that means refresh
can be hit multiple times when we only need one
API call to update the price. This case can get even more complicated when
there are multiple await
calls within refresh
.
One alternative is adding a second condition, like a boolean isLoading
, in
the code block below.
actor State {
private var _price = 0
private var isLoading = false
private var isOutdated: Bool { ContinuousClock.now - timestamp > .seconds(60) }
private var timestamp: ContinuousClock.Instant = .now
func price() async throws -> Int {
if isOutdated {
if !isLoading {
// Create a policy?
} else {
try? await refresh()
}
}
return _price
}
private func refresh() async throws {
isLoading = true
defer { isLoading = false }
_price = try await getCurrentMarketPrice()
timestamp = ContinuousClock.now
}
}
func getCurrentMarketPrice() async throws -> Int {
try await Task.sleep(nanoseconds: NSEC_PER_SEC * 2)
return (100...150).randomElement() ?? 0
}
However, this state machine lacks a mechanism to signal that the currently
running API call is done. The logic also gets more complicated when we want to
handle unforeseen errors.
A better solution is to use a Condition Variable
to signal suspended callers. This can be achieved with an optional Task
,
using its presence as a sign that a refresh
call is running. If the state is
outdated but the Task
is nil
, it can create a new Task
while subsequent
calls await its completion.
actor State {
private var task: Task<Void, Error>?
private var isOutdated: Bool { ContinuousClock.now - timestamp > .seconds(60) }
private var timestamp: ContinuousClock.Instant = .now
private var _price = 0
func price() async throws -> Int {
if isOutdated {
if task == nil {
try refresh()
}
try await task?.value
task = nil
}
return _price
}
private func refresh() throws {
guard task == nil else {
struct RefreshError: Error {}
throw RefreshError()
}
task = Task {
let price = try await getCurrentMarketPrice()
timestamp = ContinuousClock.now
_price = price
}
}
}
func getCurrentMarketPrice() async throws -> Int {
try await Task.sleep(nanoseconds: NSEC_PER_SEC * 2)
return (100...150).randomElement() ?? 0
}
In this solution, the refresh
call is no longer an async
function because
the asynchronous work is wrapped inside a Task
that returns Void
but can
throw an error. Additionally, the logic inside price
ensures that a refresh
is only invoked if the task
is nil
, indicating no ongoing API call. Once
the task
completes, its result is returned, and the task
is reset to nil
for future updates.