Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions .github/workflows/scripts/functions/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,10 @@ export { fetchAppCheckTokenV2 } from './fetchAppCheckToken';
export { sendFCM } from './sendFCM';

export { testFetchStream, testFetch } from './vertexaiFunctions';

export {
testStreamingCallable,
testProgressStream,
testComplexDataStream,
testStreamWithError,
} from './testStreamingCallable';
159 changes: 159 additions & 0 deletions .github/workflows/scripts/functions/src/testStreamingCallable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
import { onCall, CallableRequest, CallableResponse } from 'firebase-functions/v2/https';
import { logger } from 'firebase-functions/v2';

/**
* Test streaming callable function that sends multiple chunks of data
* This function demonstrates Server-Sent Events (SSE) streaming
*/
export const testStreamingCallable = onCall(
async (
req: CallableRequest<{ count?: number; delay?: number }>,
response?: CallableResponse<any>,
) => {
const count = req.data.count || 5;
const delay = req.data.delay || 500;

logger.info('testStreamingCallable called', { count, delay });

// Send chunks of data over time
for (let i = 0; i < count; i++) {
// Wait for the specified delay
await new Promise(resolve => setTimeout(resolve, delay));

if (response) {
await response.sendChunk({
index: i,
message: `Chunk ${i + 1} of ${count}`,
timestamp: new Date().toISOString(),
data: {
value: i * 10,
isEven: i % 2 === 0,
},
});
}
}

// Return final result
return { totalCount: count, message: 'Stream complete' };
},
);

/**
* Test streaming callable that sends progressive updates
*/
export const testProgressStream = onCall(
async (
req: CallableRequest<{ task?: string }>,
response?: CallableResponse<any>,
) => {
const task = req.data.task || 'Processing';

logger.info('testProgressStream called', { task });

const updates = [
{ progress: 0, status: 'Starting...', task },
{ progress: 25, status: 'Loading data...', task },
{ progress: 50, status: 'Processing data...', task },
{ progress: 75, status: 'Finalizing...', task },
{ progress: 100, status: 'Complete!', task },
];

for (const update of updates) {
await new Promise(resolve => setTimeout(resolve, 300));
if (response) {
await response.sendChunk(update);
}
}

return { success: true };
},
);

/**
* Test streaming with complex data types
*/
export const testComplexDataStream = onCall(
async (req: CallableRequest, response?: CallableResponse<any>) => {
logger.info('testComplexDataStream called');

const items = [
{
id: 1,
name: 'Item One',
tags: ['test', 'streaming', 'firebase'],
metadata: {
created: new Date().toISOString(),
version: '1.0.0',
},
},
{
id: 2,
name: 'Item Two',
tags: ['react-native', 'functions'],
metadata: {
created: new Date().toISOString(),
version: '1.0.1',
},
},
{
id: 3,
name: 'Item Three',
tags: ['cloud', 'streaming'],
metadata: {
created: new Date().toISOString(),
version: '2.0.0',
},
},
];

// Stream each item individually
for (const item of items) {
await new Promise(resolve => setTimeout(resolve, 200));
if (response) {
await response.sendChunk(item);
}
}

// Return summary
return {
summary: {
totalItems: items.length,
processedAt: new Date().toISOString(),
},
};
},
);

/**
* Test streaming with error handling
*/
export const testStreamWithError = onCall(
async (
req: CallableRequest<{ shouldError?: boolean; errorAfter?: number }>,
response?: CallableResponse<any>,
) => {
const shouldError = req.data.shouldError !== false;
const errorAfter = req.data.errorAfter || 2;

logger.info('testStreamWithError called', { shouldError, errorAfter });

for (let i = 0; i < 5; i++) {
if (shouldError && i === errorAfter) {
throw new Error('Simulated streaming error after chunk ' + errorAfter);
}

await new Promise(resolve => setTimeout(resolve, 300));
if (response) {
await response.sendChunk({
chunk: i,
message: `Processing chunk ${i + 1}`,
});
}
}

return {
success: true,
message: 'All chunks processed successfully',
};
},
);
28 changes: 28 additions & 0 deletions packages/functions/__tests__/functions.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import {
httpsCallable,
httpsCallableFromUrl,
HttpsErrorCode,
httpsCallableStream,
httpsCallableFromUrlStream,
type HttpsCallableOptions,
type HttpsCallable as HttpsCallableType,
type FunctionsModule,
Expand Down Expand Up @@ -101,6 +103,14 @@ describe('Cloud Functions', function () {
expect(HttpsErrorCode).toBeDefined();
});

it('`httpsCallableStream` function is properly exposed to end user', function () {
expect(httpsCallableStream).toBeDefined();
});

it('`httpsCallableFromUrlStream` function is properly exposed to end user', function () {
expect(httpsCallableFromUrlStream).toBeDefined();
});

describe('types', function () {
it('`HttpsCallableOptions` type is properly exposed to end user', function () {
const options: HttpsCallableOptions = { timeout: 5000 };
Expand Down Expand Up @@ -191,6 +201,24 @@ describe('Cloud Functions', function () {
'httpsCallableFromUrl',
);
});

it('httpsCallableStream()', function () {
const functions = (getApp() as unknown as FirebaseApp).functions();
functionsRefV9Deprecation(
() => httpsCallableStream(functions, 'example'),
() => functions.httpsCallableStream('example'),
'httpsCallableStream',
);
});

it('httpsCallableFromUrlStream()', function () {
const functions = (getApp() as unknown as FirebaseApp).functions();
functionsRefV9Deprecation(
() => httpsCallableFromUrlStream(functions, 'https://example.com/example'),
() => functions.httpsCallableFromUrlStream('https://example.com/example'),
'httpsCallableFromUrlStream',
);
});
});
});
});
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package io.invertase.firebase.functions;

/*
* Copyright (c) 2016-present Invertase Limited & Contributors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this library except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
import com.facebook.react.bridge.Arguments;
import com.facebook.react.bridge.WritableMap;
import io.invertase.firebase.interfaces.NativeEvent;

public class FirebaseFunctionsStreamHandler implements NativeEvent {
static final String FUNCTIONS_STREAMING_EVENT = "functions_streaming_event";
private static final String KEY_ID = "listenerId";
private static final String KEY_BODY = "body";
private static final String KEY_APP_NAME = "appName";
private static final String KEY_EVENT_NAME = "eventName";
private String eventName;
private WritableMap eventBody;
private String appName;
private int listenerId;

FirebaseFunctionsStreamHandler(
String eventName, WritableMap eventBody, String appName, int listenerId) {
this.eventName = eventName;
this.eventBody = eventBody;
this.appName = appName;
this.listenerId = listenerId;
}

@Override
public String getEventName() {
return eventName;
}

@Override
public WritableMap getEventBody() {
WritableMap event = Arguments.createMap();
event.putInt(KEY_ID, listenerId);
event.putMap(KEY_BODY, eventBody);
event.putString(KEY_APP_NAME, appName);
event.putString(KEY_EVENT_NAME, eventName);
return event;
}

@Override
public String getFirebaseAppName() {
return appName;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,51 @@ public void httpsCallableFromUrl(
exception -> handleFunctionsException(exception, promise));
}

@Override
public void httpsCallableStream(
String appName,
String region,
String emulatorHost,
double emulatorPort,
String name,
ReadableMap data,
ReadableMap options,
double listenerId) {

Object callableData = data.toHashMap().get(DATA_KEY);

// Convert emulatorPort to Integer (null if not using emulator)
Integer port = emulatorHost != null ? (int) emulatorPort : null;

module.httpsCallableStream(
appName, region, emulatorHost, port, name, callableData, options, (int) listenerId);
}

@Override
public void httpsCallableStreamFromUrl(
String appName,
String region,
String emulatorHost,
double emulatorPort,
String url,
ReadableMap data,
ReadableMap options,
double listenerId) {

Object callableData = data.toHashMap().get(DATA_KEY);

// Convert emulatorPort to Integer (null if not using emulator)
Integer port = emulatorHost != null ? (int) emulatorPort : null;

module.httpsCallableStreamFromUrl(
appName, region, emulatorHost, port, url, callableData, options, (int) listenerId);
}

@Override
public void removeFunctionsStreaming(String appName, String region, double listenerId) {
module.removeFunctionsStreamingListener((int) listenerId);
}

private void handleFunctionsException(Exception exception, Promise promise) {
Object details = null;
String code = "UNKNOWN";
Expand Down
Loading
Loading