Combine — Single-valued publisher

Jullian Mercier
3 min readApr 3, 2020

While creating a custom operator out of Combine’s built-in ones in the `Publisher` extension is a useful (and quick!) technique to harmonize some common reactive behaviors, it is sometimes necessary to provide with some more in-depth customization to achieve our goal.

Combine enables us to conform to the `Publisher`, `Subscription` and `Subscriber` protocols to create a tailored operator and gain control over our flow.

Our purpose is to create a single-valued publisher which will deliver one value and completes immediately, or fails.

— Part 1: Creating a custom Publisher

First, we need to extend the `Publishers` enum which is « a namespace for types related to the Publisher protocol » in which we create a `AsSingle` struct that conforms to the `Publisher` protocol and has one required `receive<S>(subscriber: S)` method.

From that point on, we’ll be dealing with generics namely the Upstream type which stands for the publisher and the Downstream type which is the subscriber.

extension Publishers {
struct AsSingle<Upstream: Publisher>: Publisher {
typealias Output = Upstream.Output
typealias Failure = Upstream.Failure
private let upstream: Upstream init(upstream: Upstream) {
self.upstream = upstream
}
func receive<S>(subscriber: S) where S: Subscriber, Failure == S.Failure, Output == S.Input { subscriber.receive(subscription: Subscription(upstream: upstream, downstream: subscriber))
}
}
}

— Part 2: Creating a custom Subscription

When the publisher receives a subscriber, the subscriber needs to receive a subscription that holds an instance of a `AsSingleSink` object (a custom subscriber).

The `sink` instance will be nullified whenever the cancel() method is triggered which causes the publisher to stop delivering values.

extension Publishers.Single {
class Subscription<Downstream: Subscriber>: Combine.Subscription where Upstream.Output == Downstream.Input, Upstream.Failure == Downstream.Failure {
private var sink: AsSingleSink<Upstream, Downstream>?

init(upstream: Upstream, downstream: Downstream) {
sink = .init(upstream: upstream, downstream: downstream)
}
func request(_ demand: Subscribers.Demand) { } func cancel() {
sink = nil
}
}
}

— Part 3: Creating a custom Subscriber

The final part is to create a subscriber to manage our flow using three required methods.

  • receive(subscription: Subscription)
  • receive(_ input: Upstream.Output) -> Subscribers.Demand
  • receive(completion: Subscribers.Completion<Upstream.Failure>)

We’ll only request a single value to the upstream publisher using `subscription.request(.max(1)`

The first input will be delivered to the downstream subscriber immediately followed by a `finished` event so we can match the desired behavior.

We also need to hold an optional variable to check whether the sequence is not empty otherwise we provide with a `fatalError` in the completion event.

class AsSingleSink<Upstream: Publisher, Downstream: Subscriber>: Subscriber where Upstream.Output == Downstream.Input, Downstream.Failure == Upstream.Failure {    private var downstream: Downstream
private var _element: Upstream.Output?
init(upstream: Upstream, downstream: Downstream) {
self.downstream = downstream
upstream.subscribe(self)
}
func receive(subscription: Subscription) {
subscription.request(.max(1))
}
func receive(_ input: Upstream.Output) -> Subscribers.Demand {
_element = input
_ = downstream.receive(input)
downstream.receive(completion: .finished)
return .none
}
func receive(completion: Subscribers.Completion<Upstream.Failure>) {
switch completion {
case .failure(let err):
downstream.receive(completion: .failure(err))
case .finished:
if _element == nil {
fatalError("❌ Sequence doesn’t contain any elements.")
}
}
}
}

— Part 5: Creating the operator

The final part is to create the `asSingle` method in a `Publisher` extension and simply returns an `AsSingle` instance.

extension Publisher {
func asSingle() -> Publishers.AsSingle<Self> {
return Publishers.AsSingle(upstream: self)
}
}

— Part 6: Use case

The subscription terminates as soon as the subscriber receives one value.

var subscriptions = Set<AnyCancellable>()[1, 2, 3]
.publisher
.asSingle()
.sink(receiveCompletion: { c in
print(c)
}, receiveValue: { val in
print(val)
}).store(in: &subscriptions)
// 1
// finished

If the sequence doesn’t contain any elements, it ends up with a `fatalError`.

[]
.publisher
.asSingle()
.sink(receiveCompletion: { c in
print(c)
}, receiveValue: { val in
print(val)
}).store(in: &subscriptions)
// Fatal error: ❌ Sequence doesn’t contain any elements.

Note: In RxSwift, the `asSingle()` operator behaves quite similarly however turning an `Observable` sequence with more than one value or no value at all into a `Single` and subscribing to it ends up with an `RxError`.

The above implementation is highly based upon the CombineExt repository on Github.

Follow me on Twitter for my latest articles.

--

--

Jullian Mercier

Senior iOS engineer. jullianmercier.com. @jullian_mercier. Currently looking for new job opportunities.