Combine: Publisher sometimes loses value and completes

Related searches

I have a simple Deferred Publisher that reads data from disk and I display the data in a SwiftUI List, the Publisher works well most of the time, but sometimes it doesn't behave well, it just loses its value (which's an array of Model objects) and completes with finished message. I've tried a workaround mentioned here to use the buffer operator to keep the value in buffer because I believe the Combine's Publisher by design won't pass the data downstream if there is no demand requested by subscribers and hence dropping this data and completes, however using buffer didn't solve the issue.

The code I have:

enum FileError: Error {
    case someError
}

class ViewModel: ObservableObject {
    @Published var modelArray = [Model]()
    private var subscriptions = Set<AnyCancellable>()
    func readData() {
        DataSource()
            .readFromBundle(resource: "Sample", type: "json")
            .receive(on: DispatchQueue.main)
            .sink(receiveCompletion: { completion in
                print("Completion: \(completion)")
            }) { array in
                self.modelArray = array
        }.store(in: &subscriptions)
    }
}
struct ContentView: View {
    @ObservedObject var viewModel: ViewModel

    var body: some View {
        VStack {
            List(self.viewModel.modelArray) { model in
                Text("\(model.name)")
            }
        }
        .onAppear {
            self.viewModel.readData()
        }
    }
}

struct Model: Codable, Identifiable {
    var id: Int
    var name: String
}

class DataSource {
    private let readQueue = DispatchQueue(label: "ReadQueue", qos: .default, attributes: .concurrent)

    func readFromBundle (resource: String, type:String) -> AnyPublisher<[Model], FileError> {
            Deferred {
                 Future { promise in
                    guard let url = Bundle.main.url(forResource: "Sample", withExtension: "json"),
                      let data = try? Data(contentsOf: url),
                      let modelArray = try? JSONDecoder().decode([Model].self, from: data)
                      else {
                        promise(.failure(.someError))
                        return
                    }
                      promise(.success(modelArray))
                }
            }
           .receive(on: self.readQueue)
           .eraseToAnyPublisher()
        }
}

This is a link to download a working sample project.

EDIT:

Environment: Xcode 11.3.1, iOS 13.3 iPhone 11 Pro Max simulator and device.

gif screenshot (notice the console output)

EDIT2:

if I add any downstream publishers, like combineLatest for example just before sink in the consumer function readData() then a new behavior introduced, which's chaining an async publisher (readFromBundle) with a sync publisher (combineLatest) will result in the value will not deliver at all on iOS 13.3+ devices and will sometimes deliver on devices below iOS 13.3, as stated on this link.


It looks like racing-kind issue, please try the following (just by code-reading)

1) use background queue explicitly

private let readQueue = DispatchQueue(label: "ReadQueue", qos: .background, 
    attributes: .concurrent)

2) schedule Publisher on this queue instead of receiving on it

.subscribe(on: self.readQueue)

Combine: Publisher sometimes loses value and completes, Combine: Publisher sometimes loses value and completes. 发布于 2020-05-03 07:04:02. I have a simple Deferred Publisher that reads data from disk and I� When a value in the left table doesn't have a corresponding match in the right table, you see a null value in the data grid. Right: When you use a right join to combine tables, the result is a table that contains all values from the right table and corresponding matches from the left table.


Let see the documentation about .receive(on:)

Specifies the scheduler on which to receive elements from the publisher. Declaration

func receive<S>(on scheduler: S, options: S.SchedulerOptions? = nil) -> Publishers.ReceiveOn<Publishers.SubscribeOn<Deferred<Future<[Model], FileError>>, DispatchQueue>, S> where S : Scheduler

Discussion

You use the receive(on:options:) operator to receive results on a specific scheduler, such as performing UI work on the main run loop. In contrast with subscribe(on:options:), which affects upstream messages, receive(on:options:) changes the execution context of downstream messages. In the following example, requests to jsonPublisher are performed on backgroundQueue, but elements received from it are performed on RunLoop.main.

let jsonPublisher = MyJSONLoaderPublisher() // Some publisher.
let labelUpdater = MyLabelUpdateSubscriber() // Some subscriber that updates the UI.

jsonPublisher
    .subscribe(on: backgroundQueue)
    .receiveOn(on: RunLoop.main)
    .subscribe(labelUpdater)

Parameters

scheduler The scheduler the publisher is to use for element delivery. options Scheduler options that customize the element delivery. Returns

A publisher that delivers elements using the specified scheduler.

in your case it means

import SwiftUI
import Combine

enum FileError: Error {
    case someError
}

class ViewModel: ObservableObject {
    @Published var modelArray = [Model]()
    private var subscriptions = Set<AnyCancellable>()
    func readData() {
        DataSource()
            .readFromBundle(resource: "Sample", type: "json")
            .sink(receiveCompletion: { completion in
                print("Completion: \(completion)")
            }) { array in
                print("received value")
                self.modelArray = array
        }.store(in: &subscriptions)
    }
}
struct ContentView: View {
    @ObservedObject var viewModel: ViewModel

    var body: some View {
        VStack {
            List(self.viewModel.modelArray) { model in
                Text("\(model.name)")
            }
        }
        .onAppear {
            self.viewModel.readData()
        }
    }
}

struct Model: Codable, Identifiable {
    var id: Int
    var name: String
}

class DataSource {
    private let readQueue = DispatchQueue(label: "ReadQueue", qos: .default, attributes: .concurrent)

    func readFromBundle (resource: String, type:String) -> AnyPublisher<[Model], FileError> {
            Deferred {
                 Future { promise in
                    guard let url = Bundle.main.url(forResource: "Sample", withExtension: "json"),
                      let data = try? Data(contentsOf: url),
                      let modelArray = try? JSONDecoder().decode([Model].self, from: data)
                      else {
                        promise(.failure(.someError))
                        return
                    }
                      promise(.success(modelArray))
                }
            }
            .subscribe(on: readQueue)
            .receive(on: RunLoop.main)
           .eraseToAnyPublisher()
        }
}

which explain, why Asperi's solution works. The difference is, that there is not necessary to call .receive(on:) again in readData()

the difference between DispatchQueue.main and RunLoop.main are not significant in your example.

Combine `.receive(on: RunLoop.main)` loses sent value. How can I , My message gets lost if I make subject to receive on a scheduler. The difference with the stock Combine Publisher.receive(on:options:) method is that only values and I write dispatch-queues sensitive code and Combine sometimes (only subscribe(on:) should asynchronously complete construction). The answer is that Point is a value type, and the default value for a Point is (0,0), not null. Failure to recognize this is a very easy (and common) mistake to make in C#. Many (but not all) value types have an IsEmpty property which you can check to see if it is equal to its default value: Console.WriteLine(point1.IsEmpty); // True


the first run does not fail, it just "needs" a time to load it....you can check this by adding this.

print("ready")
promise(.success(modelArray)) 

and then set a breakpoint to "not loaded yet" and you will see that the "not loaded yet" appears before the "ready" is printed in the console. This is not a drop of the publisher.

As onAppear() says, it will be called after the UI was shown....

if self.viewModel.modelArray.count == 0 {
                Text("not loaded yet")
            } else {
                List(self.viewModel.modelArray) { model in
                    Text("\(model.name)")
                }
            }

Using Combine, a declarative Swift API for processing values over time. publisher, it meant that there were scenarios where data might appear to be lost. to return a single value and complete and expecting a publisher to return many values over time. to use Combine, it is often beneficial to think about pipelines as being one of these� How to Value Your Art. Valuing art is the act of placing a dollar value on a painting, sculpture, or other artistic work. Appraising is an art, not a science, and market trends can quickly cause fluctuations in prices.


Controlling Publishing with Connectable Publishers, Sometimes, you want to configure a publisher before it starts producing elements A publisher that produces values before you're ready for them can also be a If the data task completes its download before the second subscriber attaches, the To prevent a publisher from sending elements before you're ready, Combine� Here are our picks for the best texting games, so you can make the most out of a limited data plan. We've got a banner list of games that are worth trying out.


Publisher, Declares that a type can transmit a sequence of values over time. your own publisher by using one of several types provided by the Combine framework: Publishes the maximum value received from the upstream publisher, after it finishes. Tony Hawk's Pro Skater 1 + 2 is a PlayStation 4 remake of the first two Tony Hawk games on PS1. These beloved skateboarding games have been brought back on PS4 in this collection, and we've been


Start studying CS113 - Ch07 Review - File Handling and Applications. Learn vocabulary, terms, and more with flashcards, games, and other study tools.