@@ -3,6 +3,8 @@ package operator
33import (
44 "context"
55 "encoding/json"
6+ "fmt"
7+ "io"
68 "os"
79 "os/signal"
810 "sync"
@@ -45,8 +47,12 @@ func RunResourceWatch(toJsonPath, fromJsonPath string) error {
4547 var sink observationSink
4648
4749 if fromJsonPath != "" {
48- var err error
49- source , err = jsonSource (fromJsonPath )
50+ file , err := os .Open (fromJsonPath )
51+ if err != nil {
52+ return fmt .Errorf ("failed to open json file %q: %w" , fromJsonPath , err )
53+ }
54+
55+ source , err = jsonSource (file )
5056 if err != nil {
5157 return err
5258 }
@@ -59,8 +65,12 @@ func RunResourceWatch(toJsonPath, fromJsonPath string) error {
5965 }
6066
6167 if toJsonPath != "" {
62- var err error
63- sink , err = jsonSink (toJsonPath )
68+ file , err := os .OpenFile (toJsonPath , os .O_CREATE | os .O_EXCL | os .O_WRONLY | os .O_APPEND , 0664 )
69+ if err != nil {
70+ return fmt .Errorf ("failed to create json file %q: %w" , toJsonPath , err )
71+ }
72+
73+ sink , err = jsonSink (file )
6474 if err != nil {
6575 return err
6676 }
@@ -217,13 +227,7 @@ func clusterSource() (observationSource, error) {
217227 }, nil
218228}
219229
220- func jsonSource (fromJsonPath string ) (observationSource , error ) {
221- file , err := os .Open (fromJsonPath )
222- if err != nil {
223- klog .Errorf ("Failed to open json file with error %v" , err )
224- return nil , err
225- }
226-
230+ func jsonSource (file io.ReadCloser ) (observationSource , error ) {
227231 decoder := json .NewDecoder (file )
228232
229233 return func (ctx context.Context , log logr.Logger , resourceC chan <- * observe.ResourceObservation ) chan struct {} {
@@ -253,12 +257,7 @@ func jsonSource(fromJsonPath string) (observationSource, error) {
253257}
254258
255259func gitSink () (observationSink , error ) {
256- repositoryPath := "/repository"
257- if repositoryPathEnv := os .Getenv ("REPOSITORY_PATH" ); len (repositoryPathEnv ) > 0 {
258- repositoryPath = repositoryPathEnv
259- }
260-
261- gitStorage , err := storage .NewGitStorage (repositoryPath )
260+ gitStorage , err := gitInitStorage ()
262261 if err != nil {
263262 klog .Errorf ("Failed to create git storage with error %v" , err )
264263 return nil , err
@@ -269,32 +268,38 @@ func gitSink() (observationSink, error) {
269268 go func () {
270269 defer close (finished )
271270 for observation := range resourceC {
272- gvr := schema.GroupVersionResource {
273- Group : observation .Group ,
274- Version : observation .Version ,
275- Resource : observation .Resource ,
276- }
277- switch observation .ObservationType {
278- case observe .ObservationTypeAdd :
279- gitStorage .OnAdd (gvr , observation .Object )
280- case observe .ObservationTypeUpdate :
281- gitStorage .OnUpdate (gvr , observation .OldObject , observation .Object )
282- case observe .ObservationTypeDelete :
283- gitStorage .OnDelete (gvr , observation .Object )
284- }
271+ gitWrite (gitStorage , observation )
285272 }
286273 }()
287274 return finished
288275 }, nil
289276}
290277
291- func jsonSink (toJsonPath string ) (observationSink , error ) {
292- file , err := os .OpenFile (toJsonPath , os .O_CREATE | os .O_WRONLY | os .O_APPEND , 0664 )
293- if err != nil {
294- klog .Errorf ("Failed to create json file with error %v" , err )
295- return nil , err
278+ func gitInitStorage () (* storage.GitStorage , error ) {
279+ repositoryPath := "/repository"
280+ if repositoryPathEnv := os .Getenv ("REPOSITORY_PATH" ); len (repositoryPathEnv ) > 0 {
281+ repositoryPath = repositoryPathEnv
296282 }
283+ return storage .NewGitStorage (repositoryPath )
284+ }
285+
286+ func gitWrite (gitStorage * storage.GitStorage , observation * observe.ResourceObservation ) {
287+ gvr := schema.GroupVersionResource {
288+ Group : observation .Group ,
289+ Version : observation .Version ,
290+ Resource : observation .Resource ,
291+ }
292+ switch observation .ObservationType {
293+ case observe .ObservationTypeAdd :
294+ gitStorage .OnAdd (gvr , observation .Object )
295+ case observe .ObservationTypeUpdate :
296+ gitStorage .OnUpdate (gvr , observation .OldObject , observation .Object )
297+ case observe .ObservationTypeDelete :
298+ gitStorage .OnDelete (gvr , observation .Object )
299+ }
300+ }
297301
302+ func jsonSink (file io.WriteCloser ) (observationSink , error ) {
298303 encoder := json .NewEncoder (file )
299304
300305 return func (resourceC <- chan * observe.ResourceObservation ) chan struct {} {
0 commit comments