Ваш наблюдаемый не горячий. Это холодный наблюдаемый с общим источником, и это только заставляет последующих наблюдателей вести себя так, как будто они получили горячий наблюдаемый. Это, вероятно, лучше всего описывается как теплое наблюдение.
Давайте посмотрим на пример:
Observable.Using
Когда я запускаю это, я получаю:
A В С В С Е Е Е
Наблюдатели «В» и «С» пропускают первое значение последовательности.
И после того, как наблюдатели «А», «В» и «С» закончены, последовательность завершена, поэтому «D» никогда не получает значения. Я должен был создать совершенно новый наблюдаемый, чтобы отобразить значения «E».
Таким образом, в вашем коде у вас есть проблема, если первый наблюдатель заканчивает одно или несколько значений до того, как второй и третий подписываются, тогда эти наблюдатели пропускают значения. Это то, что вы хотите?
Тем не менее, ваш вопрос задается вопросом о том, как справляться с одноразовыми ценностями, полученными от наблюдаемого. Это просто, если вы используете public static IObservable<IDisposable> ImagesInFolder(IScheduler scheduler) { return Observable .Range(0, 3) .ObserveOn(Scheduler.Default) .SelectMany(x => Observable .Using( () => Disposable.Create(() => Console.WriteLine("Disposed!")), y => Observable.Return(y))) .Publish() .RefCount(); }
.
Вот аналогичная ситуация с вашим кодом:
var query = ImagesInFolder(Scheduler.Default);
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("A"); });
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("B"); });
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("C"); });
Thread.Sleep(10000);
query.Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("D"); });
Теперь, если я запустил этот код:
public static IObservable<System.Drawing.Bitmap> ImagesInFolder(string path, IScheduler scheduler)
{
return
Directory
.GetFiles(path, "*.bmp")
.ToObservable(scheduler)
.SelectMany(x =>
Observable
.Using(
() => new System.Drawing.Bitmap(x),
bm => Observable.Return(bm)))
.Publish()
.RefCount();
}
Я получаю этот вывод:
A В С public static IConnectableObservable <System.Drawing.Bitmap> ImagesInFolder (строковый путь, планировщик IScheduler) { вернуть каталог .GetFiles (путь, "* .bmp"). .ToObservable (планировщик) .SelectMany (x => наблюдаемый .С помощью( () => новый System.Drawing.Bitmap (x), bm => Наблюдаемый. Возврат (bm))) .Публиковать(); } В С Disposed! В С Disposed!
Опять же, «D» никогда не производит никаких значений - и возможно, что «B» и «C» пропускают значения, но это показывает, как вернуть наблюдаемое значение, которое автоматически удаляется с помощью наблюдателя / s, закончено.
Ваш код будет выглядеть так:
public void Main()
{
var images = ImagesInFolder("c:UsersVASIYADesktopSample Images", TaskPoolScheduler.Instance);
var process1 = images.Subscribe(SaveBwImages);
var process2 = images.Subscribe(SaveScaledImages);
var process3 = images.Select(Cats).Subscribe(SaveCatsImages);
images.Connect();
}
Тем не менее, вы все еще находитесь в стране, возможно, отсутствующих значений.
Поэтому вам действительно нужно это сделать:
.Publish().RefCount()
Затем вы называете это следующим образом:
void Main()
{
ImagesInFolder(Scheduler.Default)
.Publish(iif =>
Observable
.Merge(
iif.Select(x => { Thread.Sleep(1000); Console.WriteLine("A"); return "A"; }),
iif.Select(x => { Thread.Sleep(3000); Console.WriteLine("B"); return "B"; }),
iif.Select(x => { Thread.Sleep(2000); Console.WriteLine("C"); return "C"; })))
.Subscribe();
}
public static IObservable<IDisposable> ImagesInFolder(IScheduler scheduler)
{
return
Observable
.Range(0, 3)
.ObserveOn(Scheduler.Default)
.SelectMany(x =>
Observable
.Using(
() => Disposable.Create(() => Console.WriteLine("Disposed!")),
y => Observable.Return(y)));
}
Другой вариант заключается в том, чтобы удалить весь Disposed!
код и убедиться, что вы делаете это правильно самостоятельно, когда подписываетесь.
Попробуйте этот код:
.Publish()
Я понимаю:
A В С Disposed! В С Disposed! В С Disposed!
Опять же, один Disposed!
за каждым наблюдателем запущен, но теперь проблема заключается в том, что я изменил задержку в обработке каждого наблюдателя, но код все еще выводит порядок, в который были добавлены наблюдатели. Проблема в том, что Rx запускает каждого наблюдателя последовательно, и каждое полученное значение находится в последовательности.
Я ожидаю, что вы подумали, что можете использовать параллельную обработку .Publish()
. Вы этого не сделаете.
Способ заставить это работать параллельно - это void Main() { ImagesInFolder(Scheduler.Default).Subscribe(x => { Thread.Sleep(1000); Console.WriteLine("A"); }); ImagesInFolder(Scheduler.Default).Subscribe(x => { Thread.Sleep(3000); Console.WriteLine("B"); }); ImagesInFolder(Scheduler.Default).Subscribe(x => { Thread.Sleep(2000); Console.WriteLine("C"); }); } public static IObservable<IDisposable> ImagesInFolder(IScheduler scheduler) { return Observable .Range(0, 3) .ObserveOn(Scheduler.Default) .SelectMany(x => Observable .Using( () => Disposable.Create(() => Console.WriteLine("Disposed!")), y => Observable.Return(y))); }
полностью отказаться .
Просто делайте такие вещи:
IDisposable
Теперь я получаю следующее:
A Disposed! С Disposed! Disposed! В Disposed! Disposed! С Disposed! С Disposed! В Disposed! В Disposed!
Код теперь работает параллельно и заканчивается как можно быстрее - и правильно распоряжается, A±
когда подписка заканчивается. Вы просто не получаете радость от обмена одним доступным ресурсом с каждым наблюдателем, но вы также не получаете все поведенческие проблемы.