RxSwift: Multiple subscribers for a unit of work

December 8, 2018

Or, "how to not make unnecessary requests with a URLSession subscriber".

I've been working on a side project for better CIFilter documentation lately, and I decided this was as good of a time as any to try out RxSwift. Even though reactive programming is becoming more and more common in native app development, I've never had a chance to really dig into it, so I started by going through RxSwift's intro documentation.

Multiple Subscribers

One thing that wasn't immediately intuitive to me about Observables is that any time subscribe is called, the observable's creation block gets called again, even for Observables which perform asynchronous work. Take a simple observable that generates integers as an example:

func generateRandomIntAfterDelay() -> Observable<Int> {
    return Observable.create { observer in
        print("Processing request...")
        DispatchQueue.main.asyncAfter(deadline: DispatchTime.now() + 2) {
            print("Generating random int...")
            observer.onNext(Int.random(in: 0 ..< 10))
            observer.onCompleted()
        }
        return Disposables.create()
    }
}

If you have multiple subscribers to this observable, that means multiple units of work will get enqueued, and multiple random ints will get returned. I.e., if our subscription code looks like this:

let observable = generateRandomIntAfterDelay()
observable.subscribe(onNext: { randInt in
    print("1: Got \\(randInt)")
})
observable.subscribe(onNext: { randInt in
    print("2: Got \\(randInt)")
})

Then the output will look like this:

Processing request...
Processing request...
Generating random int...
1: Got 7
Generating random int...
2: Got 2

For some reason, I assumed that for async observables, two subscriptions would result in two outputs, but for the same async result - this is just not how observable subscriptions work.

Observable URLSession Tasks

This situation gets a little trickier when you have an observable which performs real work, like making http requests:

extension Reactive where Base: URLSession {
    public func response(urlString: String) -> Observable<Data> {
        return Observable.create { observer in
            let task = self.base.dataTask(
                with: URLRequest(url: URL(string: urlString)!)
            ) { (data, response, error) in
                guard let data = data else {
                    return
                }
                observer.onNext(data)
                observer.onCompleted()
            }
            task.resume()
            return Disposables.create {
                task.cancel()
            }
        }
    }
}

func makeRequest() {
    let observable = URLSession.shared.rx.response(
        urlString: "https://noahgilmore.com"
    )
    observable.subscribe(onNext: { data in
        print("Got the data!")
    })
    observable.subscribe(onNext: { data in
        print("Got the data again!")
    })
}

One call to makeRequest() will make two HTTP requests and get back two separate data objects. I started to wonder about how you would correctly implement multiple subscribers for a single HTTP request - it turns out an easy way is to use ConnectableObservable, an "observable sequence that shares a single subscription to the underlying sequence"[1]. Calling publish() on any observable returns a ConnectableObservable which mirrors it, and you can start the sequence with .connect(). This means you can implement multiple subscriptions for a single unit of work like this:

func makeRequest() {
    let observable = URLSession.shared.rx.response(
        urlString: "https://noahgilmore.com"
    )
    let connectableObservable = observable.publish()
    connectableObservable.subscribe(onNext: { data in
        print("Got the data!")
    })
    connectableObservable.subscribe(onNext: { data in
        print("Got the data again!")
    })

    // Start the request
    connectableObservable.connect()
}

This code correctly only makes one HTTP request with two callbacks. 👌

This isn't the only time .publish() is useful though - like asked in this Stackoverflow, you sometimes want to kick off a long-running piece of work before you actually subscribe to an observable. For example:

func longRunningTask() -> Observable<Result> {
    return Observable.create { observer in
        doSomeLongRunningWork { result in
            observer.onNext(result)
            observer.onCompleted()
        }
    }
}

func startTask() {
    let connectable = longRunningTask().publish()
    connectable.connect() // start the task

    // Do more setup...

    connectable.subscribe(onNext: { result in
        // process the result
    })
}

Hopefully this helps explain an unintuitive concept for others who are starting out with reactive programming. If you know a better way or a best practice to do this sort of thing, let me know! For reference, I've implemented these examples on Github.