Combine — Creating a custom subscriber

Jullian Mercier
2 min readFeb 23, 2020

--

While Combine provides with a `sink` method that conveniently « creates the subscriber and immediately requests an unlimited number of values, prior to returning the subscriber. », the framework also enables us to define our own subscriber by conforming to the `Subscriber` protocol.

— Use case

class CustomSubscriber: Subscriber {
typealias Input = Int
typealias Failure = Never
func receive(subscription: Subscription) {
subscription.request(.max(1))
}
func receive(_ input: Int) -> Subscribers.Demand {
print("Value:", input)
return .none
}
func receive(completion: Subscribers.Completion<Never>) {
print("Completion: \(completion)")
}
}

Since the `Subscriber` protocol has two associated types namely `Input` and `Failure`, we must specify the type of values our custom subscriber must receive as well as an error type.

There are three additional methods requirements to implement.

  1. receive(subscription: Subscription)
  2. receive(_ input:) -> Subscribers.Demand
  3. receive(completion: Subscribers.Completion<Failure>)

The `receive(subscription: Subscription)` method will be triggered one time as soon as the publisher is bound to the subscriber.

It requests a certain amount of values to the subscription object through an enum of three cases : none, unlimited, max(value:)).

The `receive(_ input:) -> Subscribers.Demand` method will be triggered each time a new value is delivered, this is the right moment to reevaluate the amount of values needed.

For instance, you could return `.none` to stick to the initial request or ask for two more values each time a new input comes in.

Note that it will be added to the initial request, so you can only increase the amount of `max` values.

let publisher = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10].publisher
let subscriber = CustomSubscriber()
publisher.subscribe(subscriber)

// Value: 1

No completion event is printed out since the stream has a finite number of values and we requested just one.

Let’s now demand an unlimited amount of values.

func receive(_ input: Int) -> Subscribers.Demand {
print("Value:", input)
return .unlimited
}
// Value: 1
// Value: 2
// Value: 3
// Value: 4
// Value: 5
// Value: 6
// Value: 7
// Value: 8
// Value: 9
// Value: 10
// Completion: finished

Let’s control our flow by conditioning our demand in the `receive(_ input:) -> Subscribers.Demand` method.

func receive(_ input: Int) -> Subscribers.Demand {
print("Value:", input)
return (input == 3) ? .none: .max(1)
}
// Value: 1
// Value: 2
// Value: 3

— Conclusion

Managing our stream through the `Subscriber` protocol enables us to have better control over our flow while opening up larger reactive implementation options.

Thanks for reading, follow me on Twitter @jullian_mercier for my latest articles!

--

--

Jullian Mercier

Senior iOS engineer. jullianmercier.com. @jullian_mercier.