RXJS Observable stretch

Related searches

I have a Rx.Observable.webSocket Subject. My server endpoint can not handle messages receiving the same time (<25ms). Now I need a way to stretch the next() calls of my websocket subject.

I have created another Subject requestSubject and subscribe to this. Then calling next of the websocket inside the subscription.

requestSubject.delay(1000).subscribe((request) => {
  console.log(`SENDING: ${JSON.stringify(request)}`);
  socketServer.next(JSON.stringify(request));
});

Using delay shifts each next call the same delay time, then all next calls emit the same time later ... thats not what I want.

I tried delay, throttle, debounce but it does not fit.

The following should illustrate my problem

Stream 1 | ---1-------2-3-4-5---------6----

    after some operation ...

Stream 2 | ---1-------2----3----4----5----6-

Had to tinker a bit, its not as easy as it looks:

//example source stream
const source = Rx.Observable.from([100,500,1500,1501,1502,1503])
  .mergeMap(i => Rx.Observable.of(i).delay(i))
  .share();

stretchEmissions(source, 1000)
  .subscribe(val => console.log(val));

function stretchEmissions(source, spacingDelayMs) {
  return source
    .timestamp()
    .scan((acc, curr) => {
      // calculate delay needed to offset next emission
      let delay = 0;
      if (acc !== null) {
        const timeDelta = curr.timestamp - acc.timestamp;
        delay = timeDelta > spacingDelayMs ? 0 : (spacingDelayMs - timeDelta);
      }
  
      return {
        timestamp: curr.timestamp,
        delay: delay,
        value: curr.value
      };
    }, null)
    .mergeMap(i => Rx.Observable.of(i.value).delay(i.delay), undefined, 1);
}
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.4.2/Rx.js"></script>

RxJS: inserting a delay between each item of a stream / btheado , observeRx = ƒ(observable). RxJS values can come all Is there a way to stretch out a stream of values over time? Transform a stream which Based on its name , the RxJS delay operator sounds promising. The timeInterval� RxJS - Observables - An observable is a function that creates an observer and attaches it to the source where values are expected from, for example, clicks, mouse events from a dom

Here are two solutions using a custom stream and using only rxjs-operators - since it looks quite complicated I would not advice you to use this solution, but to use a custom stream (see 1st example below):

Custom stream (MUCH easier to read and maintain, probably with better performance as well):

const click$ = Rx.Observable
  .fromEvent(document.getElementById("btn"), "click")
  .map((click, i) => i);

const spreadDelay = 1000;
let prevEmitTime = 0;

click$
  .concatMap(i => {  // in this case you could also use "flatMap" or "mergeMap" instead of "concatMap"
    const now = Date.now();
    if (now - prevEmitTime > spreadDelay) {
      prevEmitTime = now;
      return Rx.Observable.of(i); // emit immediately
    } else {
      prevEmitTime += spreadDelay;
      return Rx.Observable.of(i).delay(prevEmitTime - now); // emit somewhere in the future
    }
  })
  .subscribe((request) => {
      console.log(`SENDING: ${request}`);
  });
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>
<button id="btn">Click me!</button>

Draggable Elements with RxJS, Events to Observable. We start by creating a Hammer manager and configure it to handle pan in all directions. Rx.Observable.fromEvent allows� What is RxJS? RxJS is a framework for reactive programming that makes use of Observables, making it really easy to write asynchronous code.According to the official documentation, this project is a kind of reactive extension to JavaScript with better performance, better modularity, better debuggable call stacks, while staying mostly backwards compatible, with some breaking changes that reduce

Mark van Straten's solution didn't work completely accurately for me. I found a much more simple and accurate solution based from here.

const source = from([100,500,1500,1501,1502,1503]).pipe(
    mergeMap(i => of(i).pipe(delay(i)))
);

const delayMs = 500;
const stretchedSource = source.pipe(
  concatMap(e => concat(of(e), EMPTY.pipe(delay(delayMs))))
);

Observable, RxJS introduces Observables, a new Push system for JavaScript. An Observable is a Producer of multiple values, "pushing" them to Observers (Consumers). First of all we’re importing Observable from the rxjs library. By using method Observable.create we’re creating an instance of Observable and storing it in observable. As an argument we’re passing

Testing And Debugging, If you have an observable sequence that publishes values over an extended period of time, testing it in real time can be a stretch. The RxJS library provides the� Our first observable observable1 is just a regular observable that uses setInterval() to emit an integer that increases by 1 every second. Our second observable is a Subject (a special observable that we covered in the previous tutorial) that we simply define but don't immediately emit any values.

Chapter 6. FRP on the web, Observables in RxJS; How to manage state in RxJS; RxJS examples; A glimpse To run this example, point your browser at sodium/book/web/line-stretch.html. In this article, I will show you a simple way to replace RxJS with React Hooks. If you don’t know much about RxJS, I suggest you check their documentation.But basically we can summarize it with:

Each time printLog1 is called it's returning a new observable, and each subsequent mapping operator will be a fresh call - so there'll be no inner observables to cancel. It would make sense to use concatMap if the maps were in the pipe of a subject - and printLogSubject.next() was being called to initiate the sequence – Drenai Jun 12 '19 at 20:41

Comments
  • on every value you receive, you can map the value and return an observable delayed only for this value like myStream.flatMap(val => if(whatever reason){return Rx.Observable.of(val).delay(10)} else { return Observable.of(val) ...
  • That's a great solution. Works perfectly
  • I thought about using window but doesn't it incur always having to wait for the window to emit?
  • The window-solution is not bullet-proof, yes - and complicated as well... probably better to ignore it :-)