diff --git a/plugins/connectors/sharepoint/api.go b/plugins/connectors/sharepoint/api.go new file mode 100644 index 00000000..75078905 --- /dev/null +++ b/plugins/connectors/sharepoint/api.go @@ -0,0 +1,177 @@ +package sharepoint + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + log "github.com/cihub/seelog" + "golang.org/x/oauth2" + "golang.org/x/oauth2/microsoft" +) + +type SharePointAPIClient struct { + config *SharePointConfig + httpClient *http.Client + oauthConfig *oauth2.Config + token *oauth2.Token + retryClient *RetryClient +} + +func NewSharePointAPIClient(config *SharePointConfig) (*SharePointAPIClient, error) { + client := &SharePointAPIClient{ + config: config, + } + + // 初始化OAuth配置 + client.oauthConfig = &oauth2.Config{ + ClientID: config.ClientID, + ClientSecret: config.ClientSecret, + Endpoint: microsoft.AzureADEndpoint(config.TenantID), + Scopes: []string{"https://graph.microsoft.com/.default"}, + } + + // 初始化token + if config.AccessToken != "" { + client.token = &oauth2.Token{ + AccessToken: config.AccessToken, + RefreshToken: config.RefreshToken, + Expiry: config.TokenExpiry, + } + } + + // 初始化HTTP客户端 + ctx := context.Background() + client.httpClient = client.oauthConfig.Client(ctx, client.token) + + // 初始化重试客户端 + client.retryClient = NewRetryClient(config.RetryConfig) + + return client, nil +} + +func (c *SharePointAPIClient) GetSites(ctx context.Context) ([]SharePointSite, error) { + url := "https://graph.microsoft.com/v1.0/sites" + + var allSites []SharePointSite + for { + resp, err := c.retryClient.DoWithRetry(ctx, func() (*http.Response, error) { + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return nil, err + } + return c.httpClient.Do(req) + }) + + if err != nil { + return nil, fmt.Errorf("failed to get sites: %w", err) + } + + var response struct { + Value []SharePointSite `json:"value"` + NextLink string `json:"@odata.nextLink"` + } + + if err := json.NewDecoder(resp.Body).Decode(&response); err != nil { + resp.Body.Close() + return nil, fmt.Errorf("failed to decode response: %w", err) + } + resp.Body.Close() + + allSites = append(allSites, response.Value...) + + if response.NextLink == "" { + break + } + url = response.NextLink + } + + return allSites, nil +} + +func (c *SharePointAPIClient) GetDocumentLibraries(ctx context.Context, siteID string) ([]SharePointList, error) { + url := fmt.Sprintf("https://graph.microsoft.com/v1.0/sites/%s/lists?$filter=list/template eq 'documentLibrary'", siteID) + + var allLists []SharePointList + for { + resp, err := c.retryClient.DoWithRetry(ctx, func() (*http.Response, error) { + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return nil, err + } + return c.httpClient.Do(req) + }) + + if err != nil { + return nil, fmt.Errorf("failed to get document libraries: %w", err) + } + + var response struct { + Value []SharePointList `json:"value"` + NextLink string `json:"@odata.nextLink"` + } + + if err := json.NewDecoder(resp.Body).Decode(&response); err != nil { + resp.Body.Close() + return nil, fmt.Errorf("failed to decode response: %w", err) + } + resp.Body.Close() + + allLists = append(allLists, response.Value...) + + if response.NextLink == "" { + break + } + url = response.NextLink + } + + return allLists, nil +} + +func (c *SharePointAPIClient) GetItems(ctx context.Context, siteID, listID string, pageSize int) ([]SharePointItem, string, error) { + url := fmt.Sprintf("https://graph.microsoft.com/v1.0/sites/%s/lists/%s/items?$expand=fields,driveItem&$top=%d", siteID, listID, pageSize) + + resp, err := c.retryClient.DoWithRetry(ctx, func() (*http.Response, error) { + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + if err != nil { + return nil, err + } + return c.httpClient.Do(req) + }) + + if err != nil { + return nil, "", fmt.Errorf("failed to get items: %w", err) + } + defer resp.Body.Close() + + var response struct { + Value []SharePointItem `json:"value"` + NextLink string `json:"@odata.nextLink"` + } + + if err := json.NewDecoder(resp.Body).Decode(&response); err != nil { + return nil, "", fmt.Errorf("failed to decode response: %w", err) + } + + return response.Value, response.NextLink, nil +} + +func (c *SharePointAPIClient) DownloadFile(ctx context.Context, downloadURL string) ([]byte, error) { + resp, err := c.retryClient.DoWithRetry(ctx, func() (*http.Response, error) { + req, err := http.NewRequestWithContext(ctx, "GET", downloadURL, nil) + if err != nil { + return nil, err + } + return c.httpClient.Do(req) + }) + + if err != nil { + return nil, fmt.Errorf("failed to download file: %w", err) + } + defer resp.Body.Close() + + return io.ReadAll(resp.Body) +} diff --git a/plugins/connectors/sharepoint/auth.go b/plugins/connectors/sharepoint/auth.go new file mode 100644 index 00000000..72e08a7a --- /dev/null +++ b/plugins/connectors/sharepoint/auth.go @@ -0,0 +1,155 @@ +package sharepoint + +import ( + "context" + "fmt" + "net/http" + + "github.com/julienschmidt/httprouter" + log "github.com/cihub/seelog" + "golang.org/x/oauth2" + "golang.org/x/oauth2/microsoft" + "infini.sh/coco/modules/common" + "infini.sh/framework/core/api" + "infini.sh/framework/core/orm" +) + +func (p *Plugin) connect(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + datasourceID := req.URL.Query().Get("datasource_id") + if datasourceID == "" { + api.WriteError(w, fmt.Errorf("datasource_id is required"), http.StatusBadRequest) + return + } + + // 获取数据源配置 + datasource := &common.DataSource{} + datasource.ID = datasourceID + exists, err := orm.Get(datasource) + if !exists || err != nil { + api.WriteError(w, fmt.Errorf("datasource not found"), http.StatusNotFound) + return + } + + config, err := parseSharePointConfig(datasource) + if err != nil { + api.WriteError(w, err, http.StatusBadRequest) + return + } + + // 创建OAuth配置 + oauthConfig := &oauth2.Config{ + ClientID: config.ClientID, + ClientSecret: config.ClientSecret, + Endpoint: microsoft.AzureADEndpoint(config.TenantID), + Scopes: []string{"https://graph.microsoft.com/.default"}, + RedirectURL: fmt.Sprintf("%s/connector/sharepoint/oauth_redirect?datasource_id=%s", + getBaseURL(req), datasourceID), + } + + // 生成授权URL + authURL := oauthConfig.AuthCodeURL("state", oauth2.AccessTypeOffline) + + api.WriteJSON(w, map[string]interface{}{ + "auth_url": authURL, + }, http.StatusOK) +} + +func (p *Plugin) oAuthRedirect(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + datasourceID := req.URL.Query().Get("datasource_id") + code := req.URL.Query().Get("code") + + if datasourceID == "" || code == "" { + api.WriteError(w, fmt.Errorf("missing required parameters"), http.StatusBadRequest) + return + } + + // 获取数据源 + datasource := &common.DataSource{} + datasource.ID = datasourceID + exists, err := orm.Get(datasource) + if !exists || err != nil { + api.WriteError(w, fmt.Errorf("datasource not found"), http.StatusNotFound) + return + } + + config, err := parseSharePointConfig(datasource) + if err != nil { + api.WriteError(w, err, http.StatusBadRequest) + return + } + + // 交换token + oauthConfig := &oauth2.Config{ + ClientID: config.ClientID, + ClientSecret: config.ClientSecret, + Endpoint: microsoft.AzureADEndpoint(config.TenantID), + Scopes: []string{"https://graph.microsoft.com/.default"}, + RedirectURL: fmt.Sprintf("%s/connector/sharepoint/oauth_redirect?datasource_id=%s", + getBaseURL(req), datasourceID), + } + + ctx := context.Background() + token, err := oauthConfig.Exchange(ctx, code) + if err != nil { + api.WriteError(w, fmt.Errorf("failed to exchange token: %w", err), http.StatusInternalServerError) + return + } + + // 更新数据源配置 + configMap := datasource.Connector.Config.(map[string]interface{}) + configMap["access_token"] = token.AccessToken + configMap["refresh_token"] = token.RefreshToken + configMap["token_expiry"] = token.Expiry + + datasource.Connector.Config = configMap + err = orm.Update(datasource) + if err != nil { + api.WriteError(w, fmt.Errorf("failed to update datasource: %w", err), http.StatusInternalServerError) + return + } + + // 重定向到成功页面 + http.Redirect(w, req, "/datasource/edit/"+datasourceID+"?connected=true", http.StatusFound) +} + +func (p *Plugin) reset(w http.ResponseWriter, req *http.Request, ps httprouter.Params) { + datasourceID := req.URL.Query().Get("datasource_id") + if datasourceID == "" { + api.WriteError(w, fmt.Errorf("datasource_id is required"), http.StatusBadRequest) + return + } + + // 获取数据源 + datasource := &common.DataSource{} + datasource.ID = datasourceID + exists, err := orm.Get(datasource) + if !exists || err != nil { + api.WriteError(w, fmt.Errorf("datasource not found"), http.StatusNotFound) + return + } + + // 清除token + configMap := datasource.Connector.Config.(map[string]interface{}) + delete(configMap, "access_token") + delete(configMap, "refresh_token") + delete(configMap, "token_expiry") + + datasource.Connector.Config = configMap + err = orm.Update(datasource) + if err != nil { + api.WriteError(w, fmt.Errorf("failed to update datasource: %w", err), http.StatusInternalServerError) + return + } + + api.WriteJSON(w, map[string]interface{}{ + "success": true, + }, http.StatusOK) +} + +func getBaseURL(req *http.Request) string { + scheme := "http" + if req.TLS != nil { + scheme = "https" + } + return fmt.Sprintf("%s://%s", scheme, req.Host) +} diff --git a/plugins/connectors/sharepoint/config.go b/plugins/connectors/sharepoint/config.go new file mode 100644 index 00000000..8b203e40 --- /dev/null +++ b/plugins/connectors/sharepoint/config.go @@ -0,0 +1,59 @@ +package sharepoint + +import ( + "fmt" + "time" + + "infini.sh/coco/modules/common" + "infini.sh/framework/core/config" +) + +func parseSharePointConfig(datasource *common.DataSource) (*SharePointConfig, error) { + if datasource.Connector.Config == nil { + return nil, fmt.Errorf("connector config is nil") + } + + cfg, err := config.NewConfigFrom(datasource.Connector.Config) + if err != nil { + return nil, fmt.Errorf("failed to parse config: %w", err) + } + + sharePointConfig := &SharePointConfig{} + err = cfg.Unpack(sharePointConfig) + if err != nil { + return nil, fmt.Errorf("failed to unpack config: %w", err) + } + + // 设置默认的重试配置 + if sharePointConfig.RetryConfig.MaxRetries == 0 { + sharePointConfig.RetryConfig.MaxRetries = 3 + } + if sharePointConfig.RetryConfig.InitialDelay == 0 { + sharePointConfig.RetryConfig.InitialDelay = time.Second + } + if sharePointConfig.RetryConfig.MaxDelay == 0 { + sharePointConfig.RetryConfig.MaxDelay = time.Minute + } + if sharePointConfig.RetryConfig.BackoffFactor == 0 { + sharePointConfig.RetryConfig.BackoffFactor = 2.0 + } + + return sharePointConfig, nil +} + +func validateSharePointConfig(config *SharePointConfig) error { + if config.SiteURL == "" { + return fmt.Errorf("site_url is required") + } + if config.TenantID == "" { + return fmt.Errorf("tenant_id is required") + } + if config.ClientID == "" { + return fmt.Errorf("client_id is required") + } + if config.ClientSecret == "" { + return fmt.Errorf("client_secret is required") + } + + return nil +} diff --git a/plugins/connectors/sharepoint/files.go b/plugins/connectors/sharepoint/files.go new file mode 100644 index 00000000..6b0d737b --- /dev/null +++ b/plugins/connectors/sharepoint/files.go @@ -0,0 +1,335 @@ +package sharepoint + +import ( + "context" + "fmt" + "path/filepath" + "strings" + "time" + + log "github.com/cihub/seelog" + "infini.sh/coco/modules/common" + "infini.sh/framework/core/queue" + "infini.sh/framework/core/util" +) + +func (p *Plugin) syncSharePointContent(connector *common.Connector, datasource *common.DataSource, config *SharePointConfig) error { + ctx := context.Background() + + // 获取所有站点 + sites, err := p.apiClient.GetSites(ctx) + if err != nil { + return fmt.Errorf("failed to get sites: %w", err) + } + + log.Infof("[sharepoint connector] found %d sites", len(sites)) + + for _, site := range sites { + if err := p.processSite(ctx, connector, datasource, config, site); err != nil { + log.Errorf("[sharepoint connector] failed to process site %s: %v", site.Name, err) + continue + } + } + + return nil +} + +func (p *Plugin) processSite(ctx context.Context, connector *common.Connector, datasource *common.DataSource, config *SharePointConfig, site SharePointSite) error { + log.Debugf("[sharepoint connector] processing site: %s", site.Name) + + // 获取文档库 + libraries, err := p.apiClient.GetDocumentLibraries(ctx, site.ID) + if err != nil { + return fmt.Errorf("failed to get document libraries: %w", err) + } + + for _, library := range libraries { + // 检查是否在包含列表中 + if len(config.IncludeLibraries) > 0 && !contains(config.IncludeLibraries, library.Name) { + continue + } + + if err := p.processDocumentLibrary(ctx, connector, datasource, config, site, library); err != nil { + log.Errorf("[sharepoint connector] failed to process library %s: %v", library.Name, err) + continue + } + } + + return nil +} + +func (p *Plugin) processDocumentLibrary(ctx context.Context, connector *common.Connector, datasource *common.DataSource, config *SharePointConfig, site SharePointSite, library SharePointList) error { + log.Debugf("[sharepoint connector] processing library: %s", library.Name) + + var nextLink string + pageSize := p.PageSize + if pageSize == 0 { + pageSize = 100 // 默认分页大小 + } + + for { + items, next, err := p.apiClient.GetItems(ctx, site.ID, library.ID, pageSize) + if err != nil { + return fmt.Errorf("failed to get items: %w", err) + } + + for _, item := range items { + if err := p.processSharePointItem(ctx, connector, datasource, config, site, library, item); err != nil { + log.Errorf("[sharepoint connector] failed to process item %s: %v", item.Name, err) + continue + } + } + + nextLink = next + if nextLink == "" { + break + } + } + + return nil +} + +func (p *Plugin) processSharePointItem(ctx context.Context, connector *common.Connector, datasource *common.DataSource, config *SharePointConfig, site SharePointSite, library SharePointList, item SharePointItem) error { + // 跳过文件夹 + if item.Folder != nil { + return nil + } + + // 检查文件类型过滤 + if len(config.FileTypes) > 0 { + ext := strings.ToLower(filepath.Ext(item.Name)) + if !contains(config.FileTypes, ext) { + return nil + } + } + + // 检查排除文件夹 + for _, excludeFolder := range config.ExcludeFolders { + if strings.Contains(item.ParentReference.Path, excludeFolder) { + return nil + } + } + + // 创建文档对象 + document := common.Document{ + Source: common.DataSourceReference{ + ID: datasource.ID, + Name: datasource.Name, + Type: "connector", + }, + Title: item.Name, + Type: getFileType(item.File.MimeType), + Size: int(item.Size), + URL: item.WebURL, + Owner: &common.UserInfo{ + UserName: item.CreatedBy.DisplayName, + UserID: item.CreatedBy.Email, + }, + Icon: getFileIcon(item.File.MimeType), + } + + // 设置系统字段 + document.System = datasource.System + document.ID = util.MD5digest(fmt.Sprintf("%s-%s-%s", datasource.ID, site.ID, item.ID)) + document.Created = &item.LastModified + document.Updated = &item.LastModified + + // 设置分类路径 + categoryPath := fmt.Sprintf("/%s/%s%s", site.Name, library.Name, item.ParentReference.Path) + document.Category = categoryPath + document.Categories = strings.Split(strings.Trim(categoryPath, "/"), "/") + + // 构建富分类 + document.RichCategories = []common.RichLabel{ + {Label: site.Name, Key: site.ID, Icon: "site"}, + {Label: library.Name, Key: library.ID, Icon: "library"}, + } + + // 尝试下载并提取文件内容 + if shouldExtractContent(item.File.MimeType) { + content, err := p.extractFileContent(ctx, item) + if err != nil { + log.Warnf("[sharepoint connector] failed to extract content for %s: %v", item.Name, err) + } else { + document.Content = content + } + } + + // 推送到队列 + p.saveDocToQueue(document) + + return nil +} + +func (p *Plugin) extractFileContent(ctx context.Context, item SharePointItem) (string, error) { + // 构建下载URL + downloadURL := fmt.Sprintf("https://graph.microsoft.com/v1.0/sites/%s/drive/items/%s/content", + item.ParentReference.DriveID, item.ID) + + // 下载文件内容 + content, err := p.apiClient.DownloadFile(ctx, downloadURL) + if err != nil { + return "", err + } + + // 根据MIME类型提取文本内容 + return extractTextFromBytes(content, item.File.MimeType), nil +} + +func (p *Plugin) saveDocToQueue(document common.Document) { + data := util.MustToJSONBytes(document) + err := queue.Push(queue.SmartGetOrInitConfig(p.Queue), data) + if err != nil { + log.Errorf("[sharepoint connector] failed to push document to queue: %v", err) + panic(err) + } +} + +// 辅助函数 +func contains(slice []string, item string) bool { + for _, s := range slice { + if s == item { + return true + } + } + return false +} + +func getFileType(mimeType string) string { + switch { + case strings.HasPrefix(mimeType, "text/"): + return "text" + case strings.HasPrefix(mimeType, "image/"): + return "image" + case strings.HasPrefix(mimeType, "video/"): + return "video" + case strings.HasPrefix(mimeType, "audio/"): + return "audio" + case strings.Contains(mimeType, "pdf"): + return "pdf" + case strings.Contains(mimeType, "word"): + return "document" + case strings.Contains(mimeType, "excel"): + return "spreadsheet" + case strings.Contains(mimeType, "powerpoint"): + return "presentation" + default: + return "file" + } +} + +func getFileIcon(mimeType string) string { + switch { + case strings.Contains(mimeType, "pdf"): + return "pdf" + case strings.Contains(mimeType, "word"): + return "word" + case strings.Contains(mimeType, "excel"): + return "excel" + case strings.Contains(mimeType, "powerpoint"): + return "powerpoint" + case strings.HasPrefix(mimeType, "image/"): + return "image" + case strings.HasPrefix(mimeType, "video/"): + return "video" + case strings.HasPrefix(mimeType, "audio/"): + return "audio" + default: + return "file" + } +} + +func shouldExtractContent(mimeType string) bool { + extractableTypes := []string{ + "text/plain", + "text/html", + "application/pdf", + "application/vnd.openxmlformats-officedocument.wordprocessingml.document", + "application/vnd.openxmlformats-officedocument.spreadsheetml.sheet", + "application/vnd.openxmlformats-officedocument.presentationml.presentation", + } + + for _, t := range extractableTypes { + if strings.Contains(mimeType, t) { + return true + } + } + return false +} + +func extractTextFromBytes(content []byte, mimeType string) string { + // 简单的文本提取实现 + // 在实际项目中,您可能需要使用专门的库来处理不同的文件格式 + switch { + case strings.HasPrefix(mimeType, "text/"): + return string(content) + case strings.Contains(mimeType, "pdf"): + // 这里需要PDF解析库,如github.com/ledongthuc/pdf + return "PDF content extraction not implemented" + case strings.Contains(mimeType, "word"): + // 这里需要Word文档解析库 + return "Word document content extraction not implemented" + default: + return "" + } +} +7. 配置解析工具 (config.go) +package sharepoint + +import ( + "fmt" + "time" + + "infini.sh/coco/modules/common" + "infini.sh/framework/core/config" +) + +func parseSharePointConfig(datasource *common.DataSource) (*SharePointConfig, error) { + if datasource.Connector.Config == nil { + return nil, fmt.Errorf("connector config is nil") + } + + cfg, err := config.NewConfigFrom(datasource.Connector.Config) + if err != nil { + return nil, fmt.Errorf("failed to parse config: %w", err) + } + + sharePointConfig := &SharePointConfig{} + err = cfg.Unpack(sharePointConfig) + if err != nil { + return nil, fmt.Errorf("failed to unpack config: %w", err) + } + + // 设置默认的重试配置 + if sharePointConfig.RetryConfig.MaxRetries == 0 { + sharePointConfig.RetryConfig.MaxRetries = 3 + } + if sharePointConfig.RetryConfig.InitialDelay == 0 { + sharePointConfig.RetryConfig.InitialDelay = time.Second + } + if sharePointConfig.RetryConfig.MaxDelay == 0 { + sharePointConfig.RetryConfig.MaxDelay = time.Minute + } + if sharePointConfig.RetryConfig.BackoffFactor == 0 { + sharePointConfig.RetryConfig.BackoffFactor = 2.0 + } + + return sharePointConfig, nil +} + +func validateSharePointConfig(config *SharePointConfig) error { + if config.SiteURL == "" { + return fmt.Errorf("site_url is required") + } + if config.TenantID == "" { + return fmt.Errorf("tenant_id is required") + } + if config.ClientID == "" { + return fmt.Errorf("client_id is required") + } + if config.ClientSecret == "" { + return fmt.Errorf("client_secret is required") + } + + return nil +} \ No newline at end of file diff --git a/plugins/connectors/sharepoint/plugin.go b/plugins/connectors/sharepoint/plugin.go new file mode 100644 index 00000000..e778c802 --- /dev/null +++ b/plugins/connectors/sharepoint/plugin.go @@ -0,0 +1,83 @@ +/* Copyright © INFINI LTD. All rights reserved. + * Web: https://infinilabs.com + * Email: hello#infini.ltd */ + + package sharepoint + + import ( + "context" + "fmt" + "time" + + log "github.com/cihub/seelog" + "github.com/julienschmidt/httprouter" + "golang.org/x/oauth2/microsoft" + "infini.sh/coco/modules/common" + "infini.sh/coco/plugins/connectors" + "infini.sh/framework/core/api" + "infini.sh/framework/core/global" + "infini.sh/framework/core/module" + "infini.sh/framework/core/orm" + "infini.sh/framework/core/queue" + "infini.sh/framework/core/task" + "infini.sh/framework/core/util" +) + + const ConnectorSharePoint = "sharepoint" + + type Plugin struct { + connectors.BasePlugin + apiClient *SharePointAPIClient + } + + func init() { + module.RegisterUserPlugin(&Plugin{}) + } + + func (p *Plugin) Setup() { + p.BasePlugin.Init(fmt.Sprintf("connector.%s", ConnectorSharePoint), "indexing sharepoint", p) + + // 注册API端点 + api.HandleUIMethod(api.GET, "/connector/sharepoint/connect", p.connect, api.RequireLogin()) + api.HandleUIMethod(api.POST, "/connector/sharepoint/reset", p.reset, api.RequireLogin()) + api.HandleUIMethod(api.GET, "/connector/sharepoint/oauth_redirect", p.oAuthRedirect, api.RequireLogin()) + } + + func (p *Plugin) Start() error { + return p.BasePlugin.Start(time.Second * 30) + } + + func (p *Plugin) Stop() error { + return nil + } + + func (p *Plugin) Name() string { + return ConnectorSharePoint + } + + func (p *Plugin) Scan(connector *common.Connector, datasource *common.DataSource) { + log.Infof("[sharepoint connector] starting scan for datasource [%s]", datasource.Name) + + config, err := parseSharePointConfig(datasource) + if err != nil { + log.Errorf("[sharepoint connector] failed to parse config: %v", err) + return + } + + client, err := NewSharePointAPIClient(config) + if err != nil { + log.Errorf("[sharepoint connector] failed to create API client: %v", err) + return + } + + p.apiClient = client + + // 开始同步过程 + err = p.syncSharePointContent(connector, datasource, config) + if err != nil { + log.Errorf("[sharepoint connector] sync failed: %v", err) + return + } + + log.Infof("[sharepoint connector] completed scan for datasource [%s]", datasource.Name) + } \ No newline at end of file diff --git a/plugins/connectors/sharepoint/retry.go b/plugins/connectors/sharepoint/retry.go new file mode 100644 index 00000000..d3e5b195 --- /dev/null +++ b/plugins/connectors/sharepoint/retry.go @@ -0,0 +1,93 @@ +package sharepoint + +import ( + "context" + "fmt" + "math" + "net/http" + "time" + + log "github.com/cihub/seelog" +) + +type RetryClient struct { + config RetryConfig +} + +func NewRetryClient(config RetryConfig) *RetryClient { + // 设置默认值 + if config.MaxRetries == 0 { + config.MaxRetries = 3 + } + if config.InitialDelay == 0 { + config.InitialDelay = time.Second + } + if config.MaxDelay == 0 { + config.MaxDelay = time.Minute + } + if config.BackoffFactor == 0 { + config.BackoffFactor = 2.0 + } + + return &RetryClient{config: config} +} + +func (r *RetryClient) DoWithRetry(ctx context.Context, fn func() (*http.Response, error)) (*http.Response, error) { + var lastErr error + + for attempt := 0; attempt <= r.config.MaxRetries; attempt++ { + if attempt > 0 { + delay := r.calculateDelay(attempt) + log.Debugf("[sharepoint connector] retrying request after %v (attempt %d/%d)", delay, attempt, r.config.MaxRetries) + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(delay): + // 继续重试 + } + } + + resp, err := fn() + if err != nil { + lastErr = err + if !r.isRetryableError(err) { + return nil, err + } + continue + } + + // 检查HTTP状态码 + if r.isRetryableStatusCode(resp.StatusCode) { + resp.Body.Close() + lastErr = fmt.Errorf("HTTP %d: %s", resp.StatusCode, resp.Status) + continue + } + + return resp, nil + } + + return nil, fmt.Errorf("max retries exceeded: %w", lastErr) +} + +func (r *RetryClient) calculateDelay(attempt int) time.Duration { + delay := time.Duration(float64(r.config.InitialDelay) * math.Pow(r.config.BackoffFactor, float64(attempt-1))) + if delay > r.config.MaxDelay { + delay = r.config.MaxDelay + } + return delay +} + +func (r *RetryClient) isRetryableError(err error) bool { + // 网络相关错误通常可以重试 + return true +} + +func (r *RetryClient) isRetryableStatusCode(statusCode int) bool { + switch statusCode { + case 429, 500, 502, 503, 504: + return true + default: + return false + } +} diff --git a/plugins/connectors/sharepoint/types.go b/plugins/connectors/sharepoint/types.go new file mode 100644 index 00000000..37b96541 --- /dev/null +++ b/plugins/connectors/sharepoint/types.go @@ -0,0 +1,95 @@ +package sharepoint + +import ( + "time" +) + +type SharePointConfig struct { + SiteURL string `config:"site_url" json:"site_url"` + TenantID string `config:"tenant_id" json:"tenant_id"` + ClientID string `config:"client_id" json:"client_id"` + ClientSecret string `config:"client_secret" json:"client_secret"` + AuthMethod string `config:"auth_method" json:"auth_method"` // oauth2, certificate, password + + // 可选配置 + IncludeLibraries []string `config:"include_libraries" json:"include_libraries"` + ExcludeFolders []string `config:"exclude_folders" json:"exclude_folders"` + FileTypes []string `config:"file_types" json:"file_types"` + + // 重试配置 + RetryConfig RetryConfig `config:"retry" json:"retry"` + + // OAuth tokens + AccessToken string `config:"access_token" json:"access_token"` + RefreshToken string `config:"refresh_token" json:"refresh_token"` + TokenExpiry time.Time `config:"token_expiry" json:"token_expiry"` +} + +type RetryConfig struct { + MaxRetries int `config:"max_retries" json:"max_retries"` + InitialDelay time.Duration `config:"initial_delay" json:"initial_delay"` + MaxDelay time.Duration `config:"max_delay" json:"max_delay"` + BackoffFactor float64 `config:"backoff_factor" json:"backoff_factor"` +} + +type SharePointSite struct { + ID string `json:"id"` + Name string `json:"name"` + DisplayName string `json:"displayName"` + WebURL string `json:"webUrl"` + CreatedBy User `json:"createdBy"` + LastModified time.Time `json:"lastModifiedDateTime"` +} + +type SharePointList struct { + ID string `json:"id"` + Name string `json:"name"` + DisplayName string `json:"displayName"` + WebURL string `json:"webUrl"` + CreatedBy User `json:"createdBy"` + LastModified time.Time `json:"lastModifiedDateTime"` +} + +type SharePointItem struct { + ID string `json:"id"` + Name string `json:"name"` + WebURL string `json:"webUrl"` + Size int64 `json:"size"` + CreatedBy User `json:"createdBy"` + LastModified time.Time `json:"lastModifiedDateTime"` + File *FileInfo `json:"file,omitempty"` + Folder *FolderInfo `json:"folder,omitempty"` + ParentReference ParentReference `json:"parentReference"` +} + +type FileInfo struct { + MimeType string `json:"mimeType"` + Hashes Hashes `json:"hashes"` +} + +type FolderInfo struct { + ChildCount int `json:"childCount"` +} + +type Hashes struct { + QuickXorHash string `json:"quickXorHash"` + SHA1Hash string `json:"sha1Hash"` +} + +type ParentReference struct { + DriveID string `json:"driveId"` + DriveType string `json:"driveType"` + ID string `json:"id"` + Path string `json:"path"` +} + +type User struct { + Email string `json:"email"` + ID string `json:"id"` + DisplayName string `json:"displayName"` +} + +type SharePointResponse struct { + Value []interface{} `json:"value"` + NextLink string `json:"@odata.nextLink"` +}