Есть как минимум две проблемы с тем, как вы сохранили состояние внутри своего оператора.const { pipe, range } = rxjs; const { map, share, tap } = rxjs.operators; const custom = () => { let state = 0; return pipe( map(next => state * next), tap(_ => state += 1), share() ); }; const op = custom(); console.log("first use:"); range(1, 2).pipe(op).subscribe(n => console.log(n)); console.log("second use:"); range(1, 2).pipe(op).subscribe(n => console.log(n));
Первая проблема заключается в том, что это означает, что оператор больше не является ссылочно прозрачным. То есть, если вызов оператора заменяется возвращаемым значением оператора, поведение отличается:
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>
next
Вторая проблема - как упоминалось в другом ответе - заключается в том, что разные подписки будут получать разные значения в своих уведомлениях, поскольку состояние внутри оператора является общим.const { pipe, range } = rxjs; const { map, share, tap } = rxjs.operators; const custom = () => { let state = 0; return pipe( map(next => state * next), tap(_ => state += 1), share() ); }; const source = range(1, 2).pipe(custom()); console.log("first subscription:"); source.subscribe(n => console.log(n)); console.log("second subscription:"); source.subscribe(n => console.log(n));
Например, если наблюдаемый источник является синхронным, последовательные подписки будут видеть разные значения:
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>
custom
Тем не менее, можно написать оператор, очень похожий на ваш custom
оператор, и вести себя корректно при любых обстоятельствах. Для этого необходимо , чтобы гарантировать , что любое государство в операторе за подписку .
Транзакционный оператор - это просто функция, которая берет наблюдаемую и возвращает наблюдаемую, поэтому вы можете использовать ее defer
для обеспечения подписки, например:
const { defer, pipe, range } = rxjs;
const { map, share, tap } = rxjs.operators;
const custom = () => {
return source => defer(() => {
let state = 0;
return source.pipe(
map(next => state * next),
tap(_ => state += 1)
);
}).pipe(share());
};
const op = custom();
console.log("first use:");
range(1, 2).pipe(op).subscribe(n => console.log(n));
console.log("second use:");
range(1, 2).pipe(op).subscribe(n => console.log(n));
const source = range(1, 2).pipe(op);
console.log("first subscription:");
source.subscribe(n => console.log(n));
console.log("second subscription:");
source.subscribe(n => console.log(n));
.as-console-wrapper { max-height: 100% !important; top: 0; }
<script src="https://unpkg.com/rxjs@6/bundles/rxjs.umd.min.js"></script>