@@ -12,7 +12,6 @@ interface ChangeEvent {
1212 column ?: string
1313}
1414
15- // Add this new interface
1615interface CDCEventPayload {
1716 action : string
1817 schema : string
@@ -93,10 +92,16 @@ export class ChangeDataCapturePlugin extends StarbasePlugin {
9392 }
9493
9594 try {
95+ // Strip out RETURNING clause before parsing
96+ const sqlWithoutReturning = opts . sql . replace (
97+ / \s + R E T U R N I N G \s + .* $ / i,
98+ ''
99+ )
100+
96101 // Parse the SQL statement
97- const ast = parser . astify ( opts . sql )
102+ const ast = parser . astify ( sqlWithoutReturning )
98103 const astObject = Array . isArray ( ast ) ? ast [ 0 ] : ast
99- const type = ast . type || ast [ 0 ] . type
104+ const type = astObject . type
100105
101106 if ( type === 'insert' ) {
102107 this . queryEventDetected ( 'INSERT' , astObject , opts . result )
@@ -106,7 +111,7 @@ export class ChangeDataCapturePlugin extends StarbasePlugin {
106111 this . queryEventDetected ( 'UPDATE' , astObject , opts . result )
107112 }
108113 } catch ( error ) {
109- console . error ( 'Error parsing SQL in CDC plugin:' , error )
114+ console . error ( 'Error parsing SQL in CDC plugin:' , opts ?. sql , error )
110115 }
111116
112117 return opts . result
@@ -201,7 +206,7 @@ export class ChangeDataCapturePlugin extends StarbasePlugin {
201206 ) {
202207 // For any registered callback to the `onEvent` of our CDC plugin, we
203208 // will execute it after the response has been returned as to not impact
204- // roundtrip query times – hence the usage of `ctx.waitUntil(...)`
209+ // roundtrip query times – hence the usage of `ctx.waitUntil(...)`
205210 const wrappedCallback = async ( payload : CDCEventPayload ) => {
206211 const result = callback ( payload )
207212 if ( result instanceof Promise && ctx ) {
0 commit comments