Combine — share() and multicast()

Jullian Mercier
3 min readJan 2, 2020

Combine is designed around structs — which are value types — ensuring that a copy is made by the system whenever a resource is stored in a property (or passed around in functions) so it can deliver values without side-effects.

extension Publishers {/// A publisher that publishes a given sequence of elements.
/// When the publisher exhausts the elements in the sequence, the next request causes the publisher to finish.
public struct Sequence<Elements, Failure> : Publisher where Elements : Sequence, Failure : Error { {…}
}
}

When creating multiple subscribers, a copy of the publisher will be created and the values will start flowing for each one of them.

let numbers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
.publisher
.print()
numbers
.sink(receiveValue: { _ in })
.store(in: &cancellables)
numbers
.sink(receiveValue: { _ in })
.store(in: &cancellables)

Let’s check the console output to see both distinct subscriptions.

// First subscriptionreceive subscription: ([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
request unlimited
receive value: (1)
{..}
receive value: (10)
receive finished// Second subscriptionreceive subscription: ([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
request unlimited
receive value: (1)
{..}
receive value: (10)
receive finished

The weakness of such a copying process is in the resource-intensive operations — e.g network requests — that may lead to poor performance since outcomes will be duplicated rather than shared.

Combine provides with the `share()` operator that enables us to obtain a publisher by reference rather than by value.

`share()` returns a `Publishers.Share<Self>` type which is a class.

/// A publisher implemented as a class, which otherwise behaves like its upstream publisher.final public class Share<Upstream> : Publisher, Equatable where Upstream : Publisher

Implementing a network request using the `share()` operator will result in only one subscription being made to the upstream publisher since a unique resource will be shared to the downstream subscribers.

let url = URL(string: "https://jsonplaceholder.typicode.com/posts")!
let posts = URLSession.shared.dataTaskPublisher(for: url)
.map { $0.data }
.decode(type: [Post].self, decoder: JSONDecoder())
.replaceError(with: [])
.print()
.share()
posts
.sink(receiveValue: {
print(“subscription1 value: \($0)”) })
.store(in: &cancellables)
posts
.sink(receiveValue: {
print("subscription2 value: \($0)") })
.store(in: &cancellables)

Let’s check the console output to see the unique subscription.

receive subscription: (ReplaceError)
request unlimited
receive value: {..}
subscription1 value: {..}
subscription2 value: {..}
receive finished

The process is straightforward —

  1. The first subscriber subscribes to the posts publisher triggering the receive subscription event.
  2. Through the sink() method, an unlimited amount of values is requested.
  3. The value starts flowing.
  4. First subscriber receives the value from the resource.
  5. Second subscriber receives the value from the same resource.
  6. The publisher’s job is done.. a completion event is sent.

Try removing the `share()` operator and you will notice the exact same events occurring twice (since it is copied).

The `share()` operator does not involve any buffering system which means that if the second subscription occurs after the request has completed, it will only receive the completion event.

Unfortunately, Combine doesn’t provide with a `shareReplay()` operator such as in its RxSwift counterpart however we could use the `multicast()` operator to address this issue.

The `multicast()` operator uses a `ConnectablePublisher` type which provides a `connect()` method to trigger the publisher once all of the subscribers are setup.

A subject must be provided to deliver elements to the multiple subscribers.

let url = URL(string: "https://jsonplaceholder.typicode.com/posts")!
let postsSubject = PassThroughSubject<[Post], Never>()
let posts = URLSession.shared.dataTaskPublisher(for: url)
.map { $0.data }
.decode(type: [Post].self, decoder: JSONDecoder())
.replaceError(with: [])
.print()
.multicast(subject: postsSubject)
posts
.sink(receiveValue: {
print("subscription1 value: \($0)") })
.store(in: &cancellables)
DispatchQueue.main.asyncAfter(deadline: .now() + 2) {
posts
.sink(receiveValue: { print("subscription2 value: \($0)") })
.store(in: &self.cancellables)
}

At this point, nothing happens since we didn’t connect our subscribers to the upstream publisher.

Let’s fix it by adding the following code right after the second subscription in the `DispatchQueue` block.

posts
.connect()
.store(in: &self.cancellables)

Let’s check the console output to see both subscriptions receiving the requested value.

receive subscription: (ReplaceError)
request unlimited
receive value: {..}
subscription1 value: {..}
subscription2 value: {..}
receive finished

The `multicast()` operator is useful when sharing a single resource with multiple subscribers while keeping the actual network request implementation private.

There is a nice alternative to create a connectable wrapper around a publisher using the `makeConnectable()` method which will turn instantly your publisher into a `ConnectablePublisher` without having to deal with a subject.

Remove the `postsSubject` property and replace the `multicast(subject:)` operator by `makeConnectable()`.

It works just as fine as in the previous implementation.

Conclusion

Combine enables us to manage efficiently our resources through operators such as share() and multicast() with its connectable publishers giving us the power to improve our app performance when necessary.

Follow me on Twitter for my latest articles.

--

--

Jullian Mercier

Senior iOS engineer. jullianmercier.com. @jullian_mercier. Currently looking for new job opportunities.