Skip to content
This repository was archived by the owner on Jun 1, 2024. It is now read-only.
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions packages/monix-reactive/rootdoc.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ Create observables:

| | |
|-------------------|--------------------------------------------------------------------------------------- |
| {@link Observable.empty} | Creates an observable that doesn't emit anything, but immediately calls onComplete instead. |
| {@link Observable.pure} | Returns an Observable that on execution emits the given strict value. |
| {@link empty} | Creates an observable that doesn't emit anything, but immediately calls onComplete instead. |
| {@link pure} | Returns an Observable that on execution emits the given strict value. |


## Usage
Expand Down Expand Up @@ -52,7 +52,7 @@ Usage sample:
```typescript
import * as Mx from "monix"

const stream = Mx.Observable.empty()
const stream = Mx.empty()
```

### Modules: UMD and ES 2015
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,21 @@
* limitations under the License.
*/

import { ObservableInstance } from "../instance"
import { Observable } from "./observable"
import { Scheduler } from "funfix"

declare export class EvalAlwaysObservable<+A> extends ObservableInstance<A> {
constructor(_fn: () => A): EvalAlwaysObservable;
declare export function empty<A>(): Observable<A>;

unsafeSubscribeFn(subscriber: Subscriber<A>): Cancelable;
}
declare export function pure<A>(value: A): Observable<A>;

declare export class EvalOnceObservable<+A> extends ObservableInstance<A> {
constructor(_fn: () => A): EvalOnceObservable;
declare export function never<A>(): Observable<A>;

unsafeSubscribeFn(subscriber: Subscriber<A>): Cancelable;
}
declare export function evalAlways<A>(fn: () => A): Observable<A>;

declare export function evalOnce<A>(fn: () => A): Observable<A>;

declare export function fromArray<A>(arr: Array<A>, scheduler?: Scheduler): Observable<A>;

declare export function items<A>(...items: Array<A>): Observable<A>;

declare export function loop(scheduler?: Scheduler): Observable<number>;
86 changes: 86 additions & 0 deletions packages/monix-reactive/src/builders.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*!
* Copyright (c) 2018 by The Monix.js Project Developers.
* Some rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

import { Observable } from "./observable"
import { Scheduler } from "funfix"
import { EmptyObservable } from "./internal/builders/empty"
import { NeverObservable } from "./internal/builders/never"
import { PureObservable } from "./internal/builders/pure"
import { EvalAlwaysObservable, EvalOnceObservable } from "./internal/builders/eval"
import { ArrayObservable } from "./internal/builders/array"
import { LoopObservable } from "./internal/builders/loop"

/**
* Create empty observable
*/
export function empty<A>(): Observable<A> {
return EmptyObservable
}

/**
* Create an observable which issues single given value and completes
*/
export function pure<A>(value: A): Observable<A> {
return new PureObservable(value)
}

/**
* Creates an observable that never issues any elements, completes or fails
*/
export function never<A>(): Observable<A> {
return NeverObservable
}

/**
* Creates an observable that issues single element from evaluating given expression (function)
* @param fn expression to evauate and retrieve element value
*/
export function evalAlways<A>(fn: () => A): Observable<A> {
return new EvalAlwaysObservable(fn)
}

/**
* Creates an observable that issues single element from evaluating given expression (function)
* After first evaluation it memoize result value (or error) and uses it for other subscribers
* @param fn expression to evaluate and retrieve element value
*/
export function evalOnce<A>(fn: () => A): Observable<A> {
return new EvalOnceObservable(fn)
}

/**
* Creates an observable that issues all elements of given array with backpressure
* @param arr array containing elements
* @param scheduler optional scheduler
*/
export function fromArray<A>(arr: Array<A>, scheduler?: Scheduler): Observable<A> {
return new ArrayObservable(arr, scheduler || Scheduler.global.get())
}

/**
* Creates an observable that issues all arguments
*/
export function items<A>(...items: Array<A>): Observable<A> {
return new ArrayObservable(items, Scheduler.global.get())
}

/**
* Creates an observable that loops indefinitely until stopped, issues integers starting with 0 (zero)
*/
export function loop(scheduler?: Scheduler): Observable<number> {
return new LoopObservable(scheduler || Scheduler.global.get())
}
5 changes: 3 additions & 2 deletions packages/monix-reactive/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,8 @@
* limitations under the License.
*/

export * from "./observable"
export * from "./operators"
export * from "./ack"
export * from "./observer"
export * from "./observable"
export * from "./builders"
export * from "./operators"
4 changes: 4 additions & 0 deletions packages/monix-reactive/src/internal/ack-utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import { Ack, SyncAck, AsyncAck, Stop, Continue } from "../ack"
/**
* Executes callback synchronously for given Ack
* @private
* @hidden
*/
export function syncOn(ack: Ack, callback: (t: Try<SyncAck>) => void): Ack {
if (ack === Continue || ack === Stop) {
Expand All @@ -35,6 +36,7 @@ export function syncOn(ack: Ack, callback: (t: Try<SyncAck>) => void): Ack {
/**
* Executes callback only for sync or async Continue ack
* @private
* @hidden
*/
export function syncOnContinue(ack: Ack, callback: () => void): Ack {
if (ack === Continue) {
Expand All @@ -53,6 +55,7 @@ export function syncOnContinue(ack: Ack, callback: () => void): Ack {
/**
* Executes callback only for Stop or failed async Ack
* @private
* @hidden
*/
export function syncOnStopOrFailure(ack: Ack, callback: () => void): Ack {
if (ack === Stop) {
Expand All @@ -71,6 +74,7 @@ export function syncOnStopOrFailure(ack: Ack, callback: () => void): Ack {
/**
* Tries to flatten ack
* @private
* @hidden
*/
export function syncTryFlatten(ack: Ack, scheduler: Scheduler): Ack {
if (ack === Continue || ack === Stop) {
Expand Down
6 changes: 4 additions & 2 deletions packages/monix-reactive/src/internal/builders/array.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,17 @@
* limitations under the License.
*/

import { ObservableInstance } from "../instance"
import { Observable } from "../../observable"
import { Continue, Stop, AsyncAck } from "../../ack"
import { Subscriber } from "../../observer"
import { Cancelable, Scheduler, BoolCancelable, IBoolCancelable } from "funfix"

/**
* An Observale that issues all elements of an array, with backpressure
* @private
* @hidden
*/
export class ArrayObservable<A> extends ObservableInstance<A> {
export class ArrayObservable<A> extends Observable<A> {

constructor(private readonly _arr: Array<A>,
private readonly _scheduler: Scheduler) {
Expand Down
23 changes: 0 additions & 23 deletions packages/monix-reactive/src/internal/builders/empty.js.flow

This file was deleted.

10 changes: 7 additions & 3 deletions packages/monix-reactive/src/internal/builders/empty.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
* limitations under the License.
*/

import { ObservableInstance } from "../instance"
import { Observable } from "../../observable"
import { Subscriber } from "../../observer"
import { Cancelable } from "funfix"

/**
* Completes immediately on subscribe without issueing any items
* @private
* @hidden
*/
class EmptyObservableImpl extends ObservableInstance<never> {
class EmptyObservableImpl extends Observable<never> {
unsafeSubscribeFn(subscriber: Subscriber<never>): Cancelable {
subscriber.onComplete()

Expand All @@ -34,5 +36,7 @@ class EmptyObservableImpl extends ObservableInstance<never> {
* {@link EmptyObservable} completes immediately on subscribe without issueing any values.
* EmptyObservable object uses [Bottom Type](https://en.wikipedia.org/wiki/Bottom_type)
* for elements to match all other types
* @private
* @hidden
*/
export const EmptyObservable: ObservableInstance<never> = new EmptyObservableImpl()
export const EmptyObservable: Observable<never> = new EmptyObservableImpl()
10 changes: 7 additions & 3 deletions packages/monix-reactive/src/internal/builders/eval.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
* limitations under the License.
*/

import { ObservableInstance } from "../instance"
import { Observable } from "../../observable"
import { Subscriber } from "../../observer"
import { Cancelable, Throwable } from "funfix"

/**
* An observable that evaluates the given function argument and emits its result.
* @private
* @hidden
*/
export class EvalAlwaysObservable<A> extends ObservableInstance<A> {
export class EvalAlwaysObservable<A> extends Observable<A> {
constructor(private readonly _fn: () => A) {
super()
}
Expand All @@ -48,8 +50,10 @@ export class EvalAlwaysObservable<A> extends ObservableInstance<A> {

/**
* An observable that evaluates once the given function argument and emits its result.
* @private
* @hidden
*/
export class EvalOnceObservable<A> extends ObservableInstance<A> {
export class EvalOnceObservable<A> extends Observable<A> {
private _result!: A
private _errorThrown: Throwable | null = null
private _hasResult: boolean = false
Expand Down
6 changes: 4 additions & 2 deletions packages/monix-reactive/src/internal/builders/loop.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,17 @@
* limitations under the License.
*/

import { ObservableInstance } from "../instance"
import { Observable } from "../../observable"
import { Continue, Stop, AsyncAck } from "../../ack"
import { Subscriber } from "../../observer"
import { Cancelable, Scheduler, BoolCancelable, IBoolCancelable } from "funfix"

/**
* Loops indefinitely until stopped, issues integers starting with 0 (zero)
* @private
* @hidden
*/
export class LoopObservable extends ObservableInstance<number> {
export class LoopObservable extends Observable<number> {
constructor(private readonly _scheduler: Scheduler) {
super()
}
Expand Down
23 changes: 0 additions & 23 deletions packages/monix-reactive/src/internal/builders/never.js.flow

This file was deleted.

10 changes: 7 additions & 3 deletions packages/monix-reactive/src/internal/builders/never.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,16 @@
* limitations under the License.
*/

import { ObservableInstance } from "../instance"
import { Observable } from "../../observable"
import { Subscriber } from "../../observer"
import { Cancelable } from "funfix"

/**
* Never issues elements, complets or fails
* @private
* @hidden
*/
class NeverObservableImpl extends ObservableInstance<never> {
class NeverObservableImpl extends Observable<never> {
unsafeSubscribeFn(subscriber: Subscriber<never>): Cancelable {
return Cancelable.empty()
}
Expand All @@ -32,5 +34,7 @@ class NeverObservableImpl extends ObservableInstance<never> {
* {@link NeverObservable} never issues any elements, complets of rails
* NeverObservable object uses [Bottom Type](https://en.wikipedia.org/wiki/Bottom_type)
* for elements to match all other types
* @private
* @hidden
*/
export const NeverObservable: ObservableInstance<never> = new NeverObservableImpl()
export const NeverObservable: Observable<never> = new NeverObservableImpl()
24 changes: 0 additions & 24 deletions packages/monix-reactive/src/internal/builders/pure.js.flow

This file was deleted.

Loading