Swift — Combine in-depth
The introduction of Combine will lead to a major shift in the app development ecosystem as Swift will be gradually embracing the power of reactive programming.
The Combine framework provides a declarative Swift API for processing values over time. These values can represent many kinds of asynchronous events. Combine declares publishers to expose values that can change over time, and subscribers to receive those values from the publishers.
It provides a native way of handling asynchronous events.
Combine‘s underlying Publisher and Subscriber protocols lay down the foundations of the framework.
— Publishers and Subscribers.
A publisher act as a values provider as it will deliver elements to one or more subscribers.
It must be tied to a subscriber through the `receive(subscriber:)` method.
Publishers communicate with their subscribers through several methods:
- receive(subscription:)
- receive(_ input: Int) -> Subscribers.Demand
- receive(completion:)
« Every Publisher must adhere to this contract for downstream subscribers to function correctly. »
Publishers provide a stream of elements which, once subscripted to, become accessible by their downstream subscribers.
The Publisher protocol is composed of two associated generic types, namely Output and Failure which respectively represents a type to produce and a failure type.
Conversely, the subscriber takes two associated generics types, Input and Failure.
Because a subscriber and a publisher are co-dependent, their associated types must match.
Declaring a publisher is pretty straightforward.
Publishers.Sequence<[Int], Never>(sequence: [1, 2, 3, 4])
This declares a Sequence publisher using the ‘Publishers’ enum type with an underlying Sequence struct.
It needs to have one or more subscribers using the receive(Subscriber) method to start delivering elements.
let subscriber = Subscribers.Sink<Int, Never>(
receiveCompletion: {
completion in
print(completion)
}) { value in
print(value)
}Publishers
.Sequence<[Int], Never>(sequence: [1, 2, 3, 4])
.receive(subscriber: subscriber)
The Sink class « creates the subscriber and immediately requests an unlimited number of values, prior to returning the subscriber ».
It is initialized using both receiveCompletion and receiveValue closures providing a convenient way to process the values from the publisher, as they come, and be notified once it has completed.
A concrete Input type is explicitly declared — e.g Int — as well as a concrete Failure type — e.g Never — .
// prints [1, 2, 3, 4]
// prints 1
// prints 2
// prints 3
// prints 4
// prints finished
- At core
First, the Publisher is receiving a subscription event acknowledging that a subscribe request has occurred thus returning a Subscription instance — e.g [Int] —
Then, the subscriber is requesting — implicitly with the Sink class — an unlimited number of values.
Each value is being received from the upstream publisher.
At last, the Publisher, because it can no longer produce elements, sends a `finished` completion event which ends its relationship with the subscriber.
Both will no longer be communicating.
Combine’s core implementation lies around the upstream publishers and the downstream subscribers, both tightly dependent to one another.
A Sequence publisher could also be declared in a simpler way using the built-in publisher variable.
[1, 2, 3, 4].publisher
.sink(receiveCompletion: { completion in
print(completion)
}) { value in
print(value)
}
The sink() method immediately creates the subscriber, values are delivered straight away and the sequence terminates with a completion event, just like before.
— Subjects
Subjects act both as a Subcriber and a Publisher.
While a subject can receive values from an upstream publisher, it can also pass along these values to its downstream subscribers.
let currentSubject = CurrentValueSubject(1)
A CurrentValueSubject will hold an initial value.
let passthroughSubject = PassthroughSubject()
A passthrough subject will start emitting values only when it gets subscribed to.
- At core
First, let’s declare an upstream publisher.
let publisher = [1, 2, 3, 4].publisher
Then, a passthrough subject.
let passthroughSubject = PassthroughSubject<Int, Never>()
We need a way to bind them together as a Publisher can only receive a Subscription type.
AnySubscriber comes in handy as it provides an init(s: Subject) which wrap our PassThroughSubject thus becoming useable by our publisher.
let anySubscriber = AnySubscriber(passthroughSubject)
let publisher = [1, 2, 3, 4].publisherpublisher.receive(subscriber: anySubscriber)
We declared an upstream publisher and a downstream subscriber, we could go a bit further.
As mentioned earlier, since a subscriber also act as a Publisher, we could use the sink() method on the PassThroughSubject to immediately create another downstream subscriber which will be tied to that subject.
let passthroughSubject = PassthroughSubject<Int, Never>()let anySubscriber = AnySubscriber(passthroughSubject)let newSubscriber = passthroughSubject
.sink(receiveCompletion: { completion in
print(completion)
}) { value in
print(value)
}let publisher = [1, 2, 3, 4].publisherpublisher.receive(subscriber: anySubscriber)
The passthroughSubject instance act as a subscriber since it receive elements from its upstream publisher — the `publisher` instance — while also acting as a publisher passing the received elements to its downstream subscriber — the ‘newSubscriber‘ instance.
The CurrentValueSubject allows direct access to its underlying value using the ´value` property as well as providing a way to pass a new value using the send(Output) method.
let currentValueSubject = CurrentValueSubject<Int, Never>(1)
print(currentValueSubject.value)
// 1currentValueSubject.send(2)
print(currentValueSubject.value)
// 2currentValueSubject.send(completion: .finished)currentValueSubject.send(10) // won’t be printedprint(currentValueSubject.value)
// prints 2
Once a subject — or a subscriber — receives a completion event, it will no longer receive inputs.
— Operators
Combine includes a handful of operators providing a convenient way to process values from an ongoing stream prior to delivering them to the subscriber.
The various operators defined as extensions on
Publisher
implement their functionality as classes or structures that extend this enumeration. For example, the contains(_:) operator returns a Publishers.Contains instance.
Some of these operators already exist in the Swift standard library.
map // Publishers.Map
flatMap // Publishers.FlatMap
filter // Publishers.Filter
reduce // Publishers.Reduce
Some of them are new.
tryMap // Publishers.TryMap
tryCatch // Publishers.TryCatch
decode // Publishers.Decode
replaceError // Publishers.ReplaceError
let passthroughSubject = PassthroughSubject<String, Never>()
let anySubscriber = AnySubscriber(passthroughSubject)passthroughSubject.sink(
receiveCompletion: { completion in
print(completion)
}) { value in
print(value)
}[1, 2, 3, 4].publisher
.filter { $0.isMultiple(of: 2) }
.map { "My even number is \(String($0))" }
.receive(subscriber: anySubscriber)// prints my even number is 2
// prints my even number is 4
// prints finished
— URL Session
Let’s fully explore the power of reactive programming by making an asynchronous network request using Combine’s built-in publisher `dataTaskPublisher`.
var subscriptions = Set<AnyCancellable>()let dataTaskPublisher = URLSession.shared.dataTaskPublisher(
for: URL(
string: “https://jsonplaceholder.typicode.com/posts"
)!
)dataTaskPublisher
.retry(2)
.map(\.data)
.decode(type: [Post].self, decoder: JSONDecoder())
.replaceError(with: [])
.receive(on: DispatchQueue.main)
.sink { posts in
print("There are \(posts.count) posts")
}
.store(in: &subscriptions)// prints There are 100 posts
Such a declarative API is very convenient when it comes to handling complex asynchronous requests while maintaining code readability.
The Publishers enum provides a struct called CombineLatest allowing us to receive the latest elements from two publishers.
let postsDataTaskPublisher = URLSession.shared.dataTaskPublisher(
for: URL(
string: “https://jsonplaceholder.typicode.com/posts"
)!
)let commentsDataTaskPublisher = URLSession.shared.dataTaskPublisher(
for: URL(
string: “https://jsonplaceholder.typicode.com/comments"
)!
)let postsPublisher = postsDataTaskPublisher
.retry(2)
.map(\.data)
.decode(type: [Post].self, decoder: JSONDecoder())
.replaceError(with: [])let commentsPublisher = commentsDataTaskPublisher
.retry(2)
.map(\.data)
.decode(type: [Comment].self, decoder: JSONDecoder())
.replaceError(with: [])Publishers.CombineLatest(postsPublisher, commentsPublisher)
.sink { posts, comments in
print(“There are \(posts.count) posts”)
print(“There are \(comments.count) comments”)
}
.store(in: &subscriptions)// prints There are 100 posts
// prints There are 500 comments
Alternatively, we could use a fetch() method which returns a publisher.
private enum Error: Swift.Error {
case invalidResponse
case invalidJSON
}private func fetchPosts() -> AnyPublisher<[Post], Swift.Error> { let url = URL(
string: “https://jsonplaceholder.typicode.com/posts"
)! return URLSession.shared.dataTaskPublisher(for: url)
.tryMap { data, response -> Data in
guard let httpResponse = response as? HTTPURLResponse,
httpResponse.statusCode == 200 else {
throw Error.invalidResponse
}
return data
}
.tryMap { data -> [Post] in
guard let posts = try?
JSONDecoder().decode([Post].self, from: data) else
{
throw Error.invalidJSON
}
return posts
}
.eraseToAnyPublisher()
}
— UI Binding
The assign(to: on) is helpful when it comes to binding « the value of a KVO-compliant property from the publisher ».
Let’s populate our cells from our post objects.
func tableView(_ tableView: UITableView, cellForRowAt indexPath: IndexPath) -> UITableViewCell { let cell = tableView.dequeueReusableCell(
withIdentifier: PostCell.identifier,
for: indexPath
) as! PostCell let postsDataTaskPublisher =
URLSession
.shared
.dataTaskPublisher(
for: URL(
string:“https://jsonplaceholder.typicode.com/posts"
)!
)cell.subscriber = postsDataTaskPublisher
.retry(2)
.map(\.data)
.decode(type: [Post].self, decoder: JSONDecoder())
.replaceError(with: [])
.map {
return $0[indexPath.row].title
}
.receive(on: DispatchQueue.main)
.assign(to: \.textLabel!.text, on: cell)
return cell
}final class PostCell: UITableViewCell { var subscriber: AnyCancellable?
static var identifier = "PostCell" override func prepareForReuse() {
subscriber?.cancel()
} override init(
style: UITableViewCell.CellStyle,
reuseIdentifier: String?
) {
super.init(style: style, reuseIdentifier: reuseIdentifier)
} required init?(coder: NSCoder) {
fatalError(“init(coder:) has not been implemented”)
}
}
— Cancellable
Whenever a subscriber no longer needs to receive elements from a publisher, it may cancel its subscription.
« The subscriber types created by sink(receiveCompletion:receiveValue:) and assign(to:on:) both implement the Cancellable protocol, which provides a cancel() method »
var subscriptions = Set<AnyCancellable>()let dataTaskPublisher = URLSession.shared.dataTaskPublisher(
for: URL(
string: “https://jsonplaceholder.typicode.com/posts"
)!
)let postsPublisher = dataTaskPublisher
.retry(2)
.map(\.data)
.decode(type: [Post].self, decoder: JSONDecoder())
.replaceError(with: [])
.sink { posts in
print("There are \(posts.count) posts")
}
.store(in: &subscriptions)postsPublisher.cancel()
This won’t print anything yet because reactive programming deals with asynchronous events however the cancel() method is called before data has been fetched from the service.
Adding the following code block will address the issue.
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
postsPublisher.cancel()
}// prints There are 100 posts
— Type erasers
AnySubscriber and AnyPublisher wraps respectively the Subscriber and Publisher protocols.
They come in handy when the details of an underlying object mustn’t be exposed using access controls.
A good use case is when dealing with closures.
Your app may also have the common pattern of using a closure as a property to invoke when certain events happen.
With Combine, you can replace this pattern by using a
Subject
. A subject allows you to imperatively publish a new element at any time by calling thesend()
method.Adopt this pattern by using a
private
PassthroughSubject
orCurrentValueSubject
, then expose this publicly as anAnyPublisher.
A View Controller.
private lazy var passthroughSubject = PassthroughSubject<String, Never>()
lazy var anySubscriber = AnySubscriber(passthroughSubject)passthroughSubject.send("Hello world !")
Another View Controller.
vc.anySubscriber.sink(receiveCompletion: { completion in
print(completion)
}) { value in
print(value)
}.store(in: &subscriptions)
AnySubject wraps a subject.
AnyCancellable is a type-erased class that will automatically call cancel() on deinit using Swift’s powerful memory management system.
— RxSwift vs. Combine
In RxSwift..
- Publishers are Observables
- Subscribers are Observers
- Cancellable is Disposable
- CurrentValueSubject is BehaviorSubject
- PassthroughSubject is PublishSubject
- eraseToAnyPublisher is asObservable
- handleEvents() is do()
Conclusion
With Combine, Swift takes a significant leap towards reactive programming making it easier to deal with asynchronous events in our apps.
While the adoption will be progressive — Combine is still in its early days — , the power of such a declarative API will definitely enhance the app development process.
Follow me on Twitter for my latest articles.