Reactive programming — Networking with Combine and RxSwift — Part 1: Publisher and Observable

Jullian Mercier
4 min readOct 27, 2019

RxSwift and Combine are reactive programming solutions whose purpose is to handle asynchronous events.

RxSwift is the most popular framework whereas Combine is Apple’s recently introduced built-in solution.

Dealing with such a different programming paradigm may be tricky at first however, when it comes to networking, one can really feels the benefits of these solutions.

Both frameworks provide with different ways of handling network requests in a declarative manner.

— Combine

struct Post: Decodable {
let userId: Int
let id: Int
let title: String
let body: String
}
final class Webservice {
private enum Error: Swift.Error {
case invalidResponse(URLResponse)
case invalidJSON(Swift.Error)
case invalidData
}
func fetchPosts() -> AnyPublisher<[Post], Swift.Error> {
let url = URL(
string: “https://jsonplaceholder.typicode.com/posts"
)!
return URLSession.shared.dataTaskPublisher(for: url)
.tryMap { data, response -> Data in
guard
let httpResponse = response as? HTTPURLResponse,
httpResponse.statusCode == 200 else {
throw Error.invalidResponse(response)
}
return data
}
.decode(type: [Post].self, decoder: JSONDecoder())
.mapError { Error.invalidJSON($0) }
.receive(on: DispatchQueue.main)
.eraseToAnyPublisher()
}
}

Combine is designed around the Publisher and the Subscriber protocols which are tightly bound to each other.

A `Publisher` will emit events over time while a `Subscriber` will observe them.

A handful of operators gives the ability to process these events as they occur and deliver the expected result.

The `URLSession` class has a built-in `dataTaskPublisher` that enables us to deal with the following operations:

  • Map the URL response
  • Decode the data
  • Deal with the errors
  • Specify a scheduler

The power of functional reactive programming lies in the ability to write such operations in a declarative, synchronous way while asynchronous requests are actually performed in the background.

The return type `AnyPublisher<[Post], Swift.Error>` takes two generics types that is an Array<Post> type and an Error type.

Using `Never` as en error type would mean the Publisher cannot error out.

Incoming errors would then have to be handled with the `replaceError` operator to provide with the requested return type.

Let’s look at some other operators.

  • tryMap() enables us to « transforms all elements from the upstream publisher with a provided error-throwing closure. »
  • decode() will decode the specified type with a given decoder (e.g JSONDecoder).
  • mapError() enables us to « converts any failure from the upstream publisher into a new error. »
let webService = Webservice()
let posts = webService.fetchPosts()
var cancellable = Set<AnyCancellable>()

The `posts` publisher needs to be tied to a subscriber to retrieve the events.

The `sink` method comes in handy as it « creates the subscriber and immediately requests an unlimited number of values, prior to returning the subscriber. »

posts.sink(receiveCompletion: { completion in
print(completion)
}) { posts in
print(posts.count)
}.store(in: &cancellable)
// prints 100
// prints finished

The Publisher emit a hundred posts and terminates (e.g finished event) as mentioned in Apple’s documentation « a publisher continues to emit elements until it completes normally or fails »

Using the `Sink` class from the `Subscribers` namespace enables us to create and store the subscriber.

let subscriber = Subscribers.Sink<[Post], Swift.Error>(receiveCompletion: { completion in
print(completion)
}) { posts in
print(posts.count)
}

The subscriber needs to be attached to its publisher using the `receive` method.

posts.receive(subscriber: subscriber)// prints 100
// prints finished
subscriber.store(in: &cancellable)

The `sink` method returns `AnyCancellable`, a type-erased object, enabling the framework to automatically cancel a subscription at deinit.

Storing the cancellable object in a `Set` using the `store` method enables Combine to effectively manage the memory while avoiding retain cycles.

— RxSwift

RxSwift is designed around the ObservableType and the ObserverType protocols.

Just like in Combine, Observable emits events over time and Observer listen to them.

The framework, precisely its RxCocoa component, provides a way to use `URLSession` in a reactive way through its `rx` extension.

URLSession.shared.rx.response(request: request)

The following `fetch()` method will return an Observable of the specified type — e.g Array<Post> —

final class Webservice {
let disposeBag = DisposeBag()
private enum Error: Swift.Error {
case invalidResponse(URLResponse?)
case invalidJSON(Swift.Error)
}
func fetch() -> Observable<[Post]> {
let url = URL(
string: “https://jsonplaceholder.typicode.com/posts"
)!
let request = URLRequest(url: url) return URLSession.shared.rx.response(request: request)
.map { result -> Data in
guard result.response.statusCode == 200 else {
throw Error.invalidResponse(result.response)
}
return result.data
}.map { data in
do {
let posts = try JSONDecoder().decode(
[Post].self, from: data
)
return posts } catch let error {
throw Error.invalidJSON(error)
}
}
.observeOn(MainScheduler.instance)
.asObservable()
}
}

The RxSwift equivalent to Combine’s `sink` is the `subscribe` function which « subscribes an element handler, an error handler, a completion handler and disposed handler to an observable sequence. ». A way of notifying that a subscription has been made to some observable sequence.

let webService = Webservice()webService.fetch().subscribe(onNext: { posts in
print(posts.count)
}, onError: { error in
print(error)
}).disposed(by: disposeBag)

The `subscribe` method returns a `Disposable` type that enables the framework to dispose (cancel) a subscription at deinit.

The `DisposeBag` class holds a reference to each suscription and release them when necessary.

RxSwift provides another way to create and return an Observable using the static func `create` from the ObservableType protocol extension.

func fetch() -> Observable<[Post]> {
return Observable.create { observer in
let url = URL(
string: “https://jsonplaceholder.typicode.com/posts"
)!
let session = URLSession.shared let dataTask = session.dataTask(with: url) {
(data, response, error) in
guard
let httpResponse = response as? HTTPURLResponse,
httpResponse.statusCode == 200 {
observer.onError(
Error.invalidResponse(response)
)
return
}
guard let data = date else {
observer.onError(Error.invalidData)
return
}
do {
let posts = try JSONDecoder().decode(
[Post].self, from: data
)
observer.onNext(posts)
observer.onCompleted()
} catch let error {
observer.onError(error)
return
}
}

dataTask.resume()

return Disposables.create {
dataTask.cancel()
}
}
}

Inside the block, the `observer` function argument is used to emit events such as `onError` or `onNext` or `onCompleted`.

A disposable is returned so the reference to the subscription can be properly released from memory at deinit.

Although Rx and Combine implementations using Publisher and Observable are well-suited when it comes to processing a stream of values, both frameworks provide with variations of these protocols particularly adapted to deal with networking requests. Meet Future and Single.

Continue to part 2: Future and Single

--

--

Jullian Mercier

Senior iOS engineer. jullianmercier.com. @jullian_mercier. Currently looking for new job opportunities.