11import type { Tx } from "@ctrlplane/db" ;
22import type { InsertResource } from "@ctrlplane/db/schema" ;
3+ import type { Span } from "@ctrlplane/logger" ;
34
45import {
56 getResources ,
@@ -8,14 +9,13 @@ import {
89 upsertResources ,
910} from "@ctrlplane/db" ;
1011import * as schema from "@ctrlplane/db/schema" ;
11- import { logger } from "@ctrlplane/logger" ;
12+ import { SpanStatusCode } from "@ctrlplane/logger" ;
1213import { getAffectedVariables } from "@ctrlplane/rule-engine" ;
1314
1415import { eventDispatcher } from "../index.js" ;
16+ import { createSpanWrapper } from "../span.js" ;
1517import { groupResourcesByHook } from "./group-resources-by-hook.js" ;
1618
17- const log = logger . child ( { label : "upsert-resources" } ) ;
18-
1919export type ResourceToInsert = Omit <
2020 InsertResource ,
2121 "providerId" | "workspaceId"
@@ -42,126 +42,141 @@ const getPreviousResources = (
4242 ) ;
4343} ;
4444
45- export const handleResourceProviderScan = async (
46- tx : Tx ,
47- workspaceId : string ,
48- providerId : string ,
49- resourcesToInsert : ResourceToInsert [ ] ,
50- ) => {
51- log . info ( `Starting resource upsert: ${ resourcesToInsert . length } resources` ) ;
52- try {
53- const { toIgnore, toInsert, toUpdate, toDelete } =
54- await groupResourcesByHook (
55- tx ,
56- workspaceId ,
57- providerId ,
58- resourcesToInsert ,
59- ) ;
60- log . info (
61- `found ${ toInsert . length } resources to insert and ${ toUpdate . length } resources to update ` ,
62- ) ;
63-
64- const previousResources = await getPreviousResources (
65- tx ,
66- workspaceId ,
67- toUpdate ,
68- ) ;
69-
70- const previousVariables = Object . fromEntries (
71- previousResources . map ( ( r ) => [ r . identifier , r . variables ] ) ,
72- ) ;
45+ export const handleResourceProviderScan = createSpanWrapper (
46+ "handleResourceProviderScan" ,
47+ async (
48+ span : Span ,
49+ tx : Tx ,
50+ workspaceId : string ,
51+ providerId : string ,
52+ resourcesToInsert : ResourceToInsert [ ] ,
53+ ) => {
54+ span . setAttribute ( "workspace.id" , workspaceId ) ;
55+ span . setAttribute ( "provider.id" , providerId ) ;
56+ span . setAttribute ( "resources.count" , resourcesToInsert . length ) ;
57+
58+ try {
59+ const { toIgnore, toInsert, toUpdate, toDelete } =
60+ await groupResourcesByHook (
61+ tx ,
62+ workspaceId ,
63+ providerId ,
64+ resourcesToInsert ,
65+ ) ;
66+ span . setAttribute ( "resources.toInsert" , toInsert . length ) ;
67+ span . setAttribute ( "resources.toUpdate" , toUpdate . length ) ;
68+ span . setAttribute ( "resources.toDelete" , toDelete . length ) ;
7369
74- const [ insertedResources , updatedResources ] = await Promise . all ( [
75- upsertResources (
70+ const previousResources = await getPreviousResources (
7671 tx ,
7772 workspaceId ,
78- toInsert . map ( ( r ) => ( { ...r , providerId } ) ) ,
79- ) ,
80- upsertResources (
81- tx ,
82- workspaceId ,
83- toUpdate . map ( ( r ) => ( { ...r , providerId } ) ) ,
84- ) ,
85- ] ) ;
73+ toUpdate ,
74+ ) ;
8675
87- log . info (
88- `inserted ${ insertedResources . length } resources and updated ${ updatedResources . length } resources` ,
89- ) ;
76+ const previousVariables = Object . fromEntries (
77+ previousResources . map ( ( r ) => [ r . identifier , r . variables ] ) ,
78+ ) ;
9079
91- if ( toDelete . length > 0 )
92- await tx
93- . update ( schema . resource )
94- . set ( { deletedAt : new Date ( ) } )
95- . where (
96- inArray (
97- schema . resource . id ,
98- toDelete . map ( ( r ) => r . id ) ,
80+ const [ insertedResources , updatedResources ] = await Promise . all ( [
81+ upsertResources (
82+ tx ,
83+ workspaceId ,
84+ toInsert . map ( ( r ) => ( { ...r , providerId } ) ) ,
85+ ) ,
86+ upsertResources (
87+ tx ,
88+ workspaceId ,
89+ toUpdate . map ( ( r ) => ( { ...r , providerId } ) ) ,
90+ ) ,
91+ ] ) ;
92+
93+ if ( toDelete . length > 0 ) {
94+ const deletedResources = await tx
95+ . update ( schema . resource )
96+ . set ( { deletedAt : new Date ( ) } )
97+ . where (
98+ inArray (
99+ schema . resource . id ,
100+ toDelete . map ( ( r ) => r . id ) ,
101+ ) ,
102+ )
103+ . returning ( ) ;
104+ await Promise . all (
105+ deletedResources . map ( ( r ) =>
106+ eventDispatcher . dispatchResourceDeleted ( r ) ,
99107 ) ,
100108 ) ;
109+ span . setAttribute ( "resources.deleted" , deletedResources . length ) ;
110+ }
101111
102- // const insertJobs = insertedResources.map((r) => ({ name: r.id, data: r }));
103- // const deleteJobs = toDelete.map((r) => ({ name: r.id, data: r }));
104- const changedResources = updatedResources . filter ( ( r ) => {
105- const previous = previousResources . find (
106- ( pr ) =>
107- pr . identifier === r . identifier && pr . workspaceId === r . workspaceId ,
108- ) ;
109- if ( previous == null ) return true ;
110- return isResourceChanged ( previous , r ) ;
111- } ) ;
112-
113- await Promise . all (
114- insertedResources . map ( ( r ) => eventDispatcher . dispatchResourceCreated ( r ) ) ,
115- ) ;
116- await Promise . all (
117- toDelete . map ( ( r ) => eventDispatcher . dispatchResourceDeleted ( r ) ) ,
118- ) ;
112+ const changedResources = updatedResources . filter ( ( r ) => {
113+ const previous = previousResources . find (
114+ ( pr ) =>
115+ pr . identifier === r . identifier && pr . workspaceId === r . workspaceId ,
116+ ) ;
117+ if ( previous == null ) return true ;
118+ return isResourceChanged ( previous , r ) ;
119+ } ) ;
119120
120- if ( changedResources . length > 0 ) {
121121 await Promise . all (
122- changedResources . map ( async ( r ) => {
123- const previous = previousResources . find (
124- ( pr ) =>
125- pr . identifier === r . identifier &&
126- pr . workspaceId === r . workspaceId ,
127- ) ;
128- if ( previous != null )
129- await eventDispatcher . dispatchResourceUpdated ( previous , r ) ;
130- } ) ,
122+ insertedResources . map ( ( r ) =>
123+ eventDispatcher . dispatchResourceCreated ( r ) ,
124+ ) ,
131125 ) ;
132- }
126+ span . setAttribute ( "resources.inserted" , insertedResources . length ) ;
127+
128+ if ( changedResources . length > 0 ) {
129+ await Promise . all (
130+ changedResources . map ( async ( r ) => {
131+ const previous = previousResources . find (
132+ ( pr ) =>
133+ pr . identifier === r . identifier &&
134+ pr . workspaceId === r . workspaceId ,
135+ ) ;
136+ if ( previous != null )
137+ await eventDispatcher . dispatchResourceUpdated ( previous , r ) ;
138+ } ) ,
139+ ) ;
140+ }
141+ span . setAttribute ( "resources.updated" , updatedResources . length ) ;
133142
134- for ( const resource of insertedResources ) {
135- const { variables } = resource ;
136- for ( const variable of variables )
137- await eventDispatcher . dispatchResourceVariableCreated ( variable ) ;
138- }
143+ for ( const resource of insertedResources ) {
144+ const { variables } = resource ;
145+ for ( const variable of variables )
146+ await eventDispatcher . dispatchResourceVariableCreated ( variable ) ;
147+ }
139148
140- for ( const resource of updatedResources ) {
141- const { variables } = resource ;
142- const previousVars = previousVariables [ resource . identifier ] ?? [ ] ;
149+ for ( const resource of updatedResources ) {
150+ const { variables } = resource ;
151+ const previousVars = previousVariables [ resource . identifier ] ?? [ ] ;
143152
144- const affectedVariables = getAffectedVariables ( previousVars , variables ) ;
145- for ( const variable of affectedVariables ) {
146- const prev = previousVariables [ resource . identifier ] ?. find (
147- ( v ) => v . key === variable . key ,
148- ) ;
149- if ( prev != null )
150- await eventDispatcher . dispatchResourceVariableUpdated ( prev , variable ) ;
151- if ( prev == null )
152- await eventDispatcher . dispatchResourceVariableCreated ( variable ) ;
153+ const affectedVariables = getAffectedVariables ( previousVars , variables ) ;
154+ for ( const variable of affectedVariables ) {
155+ const prev = previousVariables [ resource . identifier ] ?. find (
156+ ( v ) => v . key === variable . key ,
157+ ) ;
158+ if ( prev != null )
159+ await eventDispatcher . dispatchResourceVariableUpdated (
160+ prev ,
161+ variable ,
162+ ) ;
163+ if ( prev == null )
164+ await eventDispatcher . dispatchResourceVariableCreated ( variable ) ;
165+ }
153166 }
154- }
155167
156- log . info ( "completed handling resource provider scan" ) ;
157- return {
158- ignored : toIgnore ,
159- inserted : insertedResources ,
160- updated : updatedResources ,
161- deleted : toDelete ,
162- } ;
163- } catch ( error ) {
164- log . error ( "Error upserting resources" , { error } ) ;
165- throw error ;
166- }
167- } ;
168+ return {
169+ ignored : toIgnore ,
170+ inserted : insertedResources ,
171+ updated : updatedResources ,
172+ deleted : toDelete ,
173+ } ;
174+ } catch ( error ) {
175+ span . setStatus ( {
176+ code : SpanStatusCode . ERROR ,
177+ message : String ( error ) ,
178+ } ) ;
179+ throw error ;
180+ }
181+ } ,
182+ ) ;
0 commit comments