Code & Craft by Kasra

Mutual Inclusion by Swift Concurrency

Synchronization refers to the coordination of multiple threads of execution. When accessing a shared object, it involves mechanisms to manage mutual exclusion—where only one thread can access a critical section at a time—and mutual inclusion—where multiple threads can access resources concurrently when safe to do so.

While using a mutex/lock makes it possible to provide exclusive access to a shared mutable state/object, this mutual exclusion isn’t always ideal. In some cases, multiple threads should be allowed to access the shared state as long as some condition is valid.

In the following example, we have a Sendable object that is supposed to be accessed from multiple threads. While condition is true, accessing count should be possible. However, when it is false, other threads that want to access count should be blocked/suspended until the condition is toggled back to true.

final class State: Sendable {
  var count: Int? {
    condition ? _count : nil
  }

  // Compile-time error: Stored property 'condition' of 'Sendable'-conforming
  // class 'State' is mutable
  private var condition = true
  private var _count = Int.random(in: 0...10)
}

Actors are great for protecting encapsulated states. Therefore, the first step could be changing the State object to be an actor; this should fix the compile-time error.

But this is a simple scenario where no asynchronous work is being done within the actor. In a more complicated case, we could be facing:

  1. Logic that sets the condition runs some asynchronous work.
  2. Instead of an optional integer, we want a required one. Instead of returning nil, we want the caller to be suspended until the condition is true.

For instance, let’s say the State contains the current market value of the S&P 500 Index, and we have an endpoint that returns the current market value. However, we want to implement a rate-limiting mechanism to avoid server overload. In other words, the value should be cached but refreshed every 60 seconds.

Let’s update our sample code by adding suspension and an async call for updating the price:

actor State {
  private var _price = 0
  private var isOutdated: Bool { ContinuousClock.now - timestamp > .seconds(60) }
  private var timestamp: ContinuousClock.Instant = .now

  func price() async throws -> Int {
    if isOutdated {
      try? await refresh()
    }

    return _price
  }

  private func refresh() async throws {
    _price = try await getCurrentMarketPrice()
    timestamp = ContinuousClock.now
  }
}

func getCurrentMarketPrice() async throws -> Int {
  try await Task.sleep(nanoseconds: NSEC_PER_SEC * 2)
  return (100...150).randomElement() ?? 0
}

The refresh call invokes getCurrentMarketPrice, which encapsulates the API call to the backend. At first, we might think this is all we need. However, there’s still a race condition due to actor reentrancy. While one caller might be suspended for an await call within the actor, the actor isn’t in a deadlock state, and another caller can execute work. In our scenario, that means refresh can be hit multiple times when we only need one API call to update the price. This case can get even more complicated when there are multiple await calls within refresh.

One alternative is adding a second condition, like a boolean isLoading, in the code block below.

actor State {
  private var _price = 0
  private var isLoading = false
  private var isOutdated: Bool { ContinuousClock.now - timestamp > .seconds(60) }
  private var timestamp: ContinuousClock.Instant = .now

  func price() async throws -> Int {
    if isOutdated {
      if !isLoading {
        // Create a policy?
      } else {
        try? await refresh()
      }
    }

    return _price
  }

  private func refresh() async throws {
    isLoading = true
    defer { isLoading = false }
    _price = try await getCurrentMarketPrice()
    timestamp = ContinuousClock.now
  }
}

func getCurrentMarketPrice() async throws -> Int {
  try await Task.sleep(nanoseconds: NSEC_PER_SEC * 2)
  return (100...150).randomElement() ?? 0
}

However, this state machine lacks a mechanism to signal that the currently running API call is done. The logic also gets more complicated when we want to handle unforeseen errors.

A better solution is to use a Condition Variable to signal suspended callers. This can be achieved with an optional Task, using its presence as a sign that a refresh call is running. If the state is outdated but the Task is nil, it can create a new Task while subsequent calls await its completion.

actor State {
  private var task: Task<Void, Error>?
  private var isOutdated: Bool { ContinuousClock.now - timestamp > .seconds(60) }
  private var timestamp: ContinuousClock.Instant = .now
  private var _price = 0

  func price() async throws -> Int {
    if isOutdated {
      if task == nil {
        try refresh()
      }

      try await task?.value
      task = nil
    }

    return _price
  }

  private func refresh() throws {
    guard task == nil else {
      struct RefreshError: Error {}
      throw RefreshError()
    }

    task = Task {
      let price = try await getCurrentMarketPrice()
      timestamp = ContinuousClock.now
      _price = price
    }
  }
}

func getCurrentMarketPrice() async throws -> Int {
  try await Task.sleep(nanoseconds: NSEC_PER_SEC * 2)
  return (100...150).randomElement() ?? 0
}

In this solution, the refresh call is no longer an async function because the asynchronous work is wrapped inside a Task that returns Void but can throw an error. Additionally, the logic inside price ensures that a refresh is only invoked if the task is nil, indicating no ongoing API call. Once the task completes, its result is returned, and the task is reset to nil for future updates.