Combine — Single-valued publisher
While creating a custom operator out of Combine’s built-in ones in the `Publisher` extension is a useful (and quick!) technique to harmonize some common reactive behaviors, it is sometimes necessary to provide with some more in-depth customization to achieve our goal.
Combine enables us to conform to the `Publisher`, `Subscription` and `Subscriber` protocols to create a tailored operator and gain control over our flow.
Our purpose is to create a single-valued publisher which will deliver one value and completes immediately, or fails.
— Part 1: Creating a custom Publisher
First, we need to extend the `Publishers` enum which is « a namespace for types related to the Publisher protocol » in which we create a `AsSingle` struct that conforms to the `Publisher` protocol and has one required `receive<S>(subscriber: S)` method.
From that point on, we’ll be dealing with generics namely the Upstream type which stands for the publisher and the Downstream type which is the subscriber.
extension Publishers {
struct AsSingle<Upstream: Publisher>: Publisher {
typealias Output = Upstream.Output
typealias Failure = Upstream.Failure private let upstream: Upstream init(upstream: Upstream) {
self.upstream = upstream
} func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input { subscriber.receive(subscription: Subscription(upstream: upstream, downstream: subscriber))
}
}
}
— Part 2: Creating a custom Subscription
When the publisher receives a subscriber, the subscriber needs to receive a subscription that holds an instance of a `AsSingleSink` object (a custom subscriber).
The `sink` instance will be nullified whenever the cancel() method is triggered which causes the publisher to stop delivering values.
extension Publishers.Single {
class Subscription<Downstream: Subscriber>: Combine.Subscription where Upstream.Output == Downstream.Input, Upstream.Failure == Downstream.Failure { private var sink: AsSingleSink<Upstream, Downstream>?
init(upstream: Upstream, downstream: Downstream) {
sink = .init(upstream: upstream, downstream: downstream)
} func request(_ demand: Subscribers.Demand) { } func cancel() {
sink = nil
}
}
}
— Part 3: Creating a custom Subscriber
The final part is to create a subscriber to manage our flow using three required methods.
- receive(subscription: Subscription)
- receive(_ input: Upstream.Output) -> Subscribers.Demand
- receive(completion: Subscribers.Completion<Upstream.Failure>)
We’ll only request a single value to the upstream publisher using `subscription.request(.max(1)`
The first input will be delivered to the downstream subscriber immediately followed by a `finished` event so we can match the desired behavior.
We also need to hold an optional variable to check whether the sequence is not empty otherwise we provide with a `fatalError` in the completion event.
class AsSingleSink<Upstream: Publisher, Downstream: Subscriber>: Subscriber where Upstream.Output == Downstream.Input, Downstream.Failure == Upstream.Failure { private var downstream: Downstream
private var _element: Upstream.Output? init(upstream: Upstream, downstream: Downstream) {
self.downstream = downstream
upstream.subscribe(self)
} func receive(subscription: Subscription) {
subscription.request(.max(1))
} func receive(_ input: Upstream.Output) -> Subscribers.Demand {
_element = input
_ = downstream.receive(input)
downstream.receive(completion: .finished) return .none
} func receive(completion: Subscribers.Completion<Upstream.Failure>) {
switch completion {
case .failure(let err):
downstream.receive(completion: .failure(err))
case .finished:
if _element == nil {
fatalError("❌ Sequence doesn’t contain any elements.")
}
}
}
}
— Part 5: Creating the operator
The final part is to create the `asSingle` method in a `Publisher` extension and simply returns an `AsSingle` instance.
extension Publisher {
func asSingle() -> Publishers.AsSingle<Self> {
return Publishers.AsSingle(upstream: self)
}
}
— Part 6: Use case
The subscription terminates as soon as the subscriber receives one value.
var subscriptions = Set<AnyCancellable>()[1, 2, 3]
.publisher
.asSingle()
.sink(receiveCompletion: { c in
print(c)
}, receiveValue: { val in
print(val)
}).store(in: &subscriptions)// 1
// finished
If the sequence doesn’t contain any elements, it ends up with a `fatalError`.
[]
.publisher
.asSingle()
.sink(receiveCompletion: { c in
print(c)
}, receiveValue: { val in
print(val)
}).store(in: &subscriptions)// Fatal error: ❌ Sequence doesn’t contain any elements.
Note: In RxSwift, the `asSingle()` operator behaves quite similarly however turning an `Observable` sequence with more than one value or no value at all into a `Single` and subscribing to it ends up with an `RxError`.
The above implementation is highly based upon the CombineExt repository on Github.
Follow me on Twitter for my latest articles.