-
Notifications
You must be signed in to change notification settings - Fork 620
[WIP] Add historyserver log collector #3973
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
Signed-off-by: KunWuLuan <[email protected]>
// 列举所有包含指定前缀的文件并删除。 | ||
marker := oss.Marker("") | ||
// 如果您仅需要删除src目录及目录下的所有文件,则prefix设置为src/。 | ||
prefix := oss.Prefix(objectDir) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we use english comments?
type StorageWritter interface { | ||
CreateDirectory(path string) error |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rename StorageWritter
to StorageWriter
?
type StorageWritter interface { | |
CreateDirectory(path string) error | |
type StorageWriter interface { | |
CreateDirectory(path string) error |
} | ||
|
||
// ValidateRayHanderConfig is | ||
func ValidateRayHanderConfig(c *RayCollectorConfig, fldpath *field.Path) field.ErrorList { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we rename ValidateRayHanderConfig
to ValidateRayHandlerConfig
?
c.RayCollectorConfig = *rcc | ||
c.OSSBucket = jd["ossBucket"].(string) | ||
c.OSSBucket = jd["ossEndpoint"].(string) | ||
c.OSSHistoryServerDir = jd["ossHistoryServerDir"].(string) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we avoid panic? or is this be designed in purpose?
if bucket, ok := jd["ossBucket"].(string); ok {
c.OSSBucket = bucket
} else {
return fmt.Errorf("invalid ossBucket type")
}
Copyright 2024 by the zhangjie [email protected] Authors. | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copyright 2024 by the zhangjie bingyu.zj@alibaba-inc.com Authors. | |
Copyright 2025 by the zhangjie bingyu.zj@alibaba-inc.com Authors. |
if c.Role == "Head" { | ||
if len(c.RayClusterName) == 0 { | ||
allErrs = append(allErrs, field.Invalid(fldpath, c.RayClusterName, "ray_cluster_name must be set")) | ||
} | ||
if len(c.RayClusterID) == 0 { | ||
allErrs = append(allErrs, field.Invalid(fldpath, c.RayClusterID, "ray_cluster_id must be set")) | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can be deleted in my opinion, since it’s checked above.
allErrs = append(allErrs, field.Invalid(fldpath, c.RayClusterID, "ray_cluster_id must be set")) | ||
} | ||
} | ||
return allErrs |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably add validation logic for LogBatching
and PushInterval
?
if c.LogBatching <= 0 {
allErrs = append(allErrs, field.Invalid(...))
}
if c.PushInterval <= 0 {
allErrs = append(allErrs, field.Invalid(...))
}
go 1.24.0 | ||
|
||
require ( | ||
github.com/alibabacloud-go/tea v1.3.10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we'd better have a new go.mod in the historyserver directory.
subdir, filename := filepath.Split(relativePath) | ||
|
||
if len(subdir) != 0 { | ||
dirName := path.Join(h.RootDir, subdir) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dirName := path.Join(h.RootDir, subdir) | |
dirName := filepath.Join(h.RootDir, subdir) |
} | ||
} | ||
|
||
objectName := path.Join(h.RootDir, subdir, filename) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
objectName := path.Join(h.RootDir, subdir, filename) | |
objectName := filepath.Join(h.RootDir, subdir, filename) |
objects[i] = object.Key | ||
} | ||
|
||
// 删除对象 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can you use english comments? Same with L108 & L172
buf := bytes.NewBufferString("") | ||
for { | ||
select { | ||
case <-time.Tick(h.PushInterval): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The time.Tick
will create a ticker underneath. Even though it might not cause a leakage after 1.23. Did you mean ?
for {
select {
case <-time.After(h.PushInterval):
...
or
ticker := time.Tick(h.PushInterval)
for {
select {
case <-ticker:
...
|
||
lastPush = now | ||
lines = 0 | ||
buf = bytes.NewBufferString("") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could it use buf.Reset()
after the buffer has been successful written to writer?
nextPos, err = h.Writter.Append(objectName, buf, nextPos) | ||
if err != nil { | ||
logrus.Errorf("Tail file %s to object %s error, append value: %v, nextPos %d error [%v]", | ||
absoluteLogPathName, | ||
objectName, | ||
buf.String(), | ||
nextPos, | ||
err) | ||
return err | ||
} | ||
|
||
now := time.Now() | ||
logrus.Infof("Tail file %s to object %s success, by line count, append value %s, lines %v, interval %v", | ||
absoluteLogPathName, objectName, buf.String(), lines, now.Sub(lastPush).Seconds()) | ||
|
||
lastPush = now | ||
lines = 0 | ||
buf = bytes.NewBufferString("") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it possible to put this part of code into a function? It looks similar to the code in the branch above if the lines
is greater than zero.
} | ||
} | ||
} | ||
case _, ok := <-watcher.Errors: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does it skip the error? Is it meaningless?
@@ -0,0 +1,62 @@ | |||
// Package logs is | |||
/* | |||
Copyright 2024 by the zhangjie [email protected] Authors. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Copyright 2024 by the zhangjie bingyu.zj@alibaba-inc.com Authors. | |
Copyright 2025 by the zhangjie bingyu.zj@alibaba-inc.com Authors. |
registry := backend.GetRegistry() | ||
factory, ok := registry[runtimeClassName] | ||
if !ok { | ||
panic("Not supported runtime class name: " + runtimeClassName + " for role: " + role + ".") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Use format string instead
func (c *config) complete(rcc *types.RayCollectorConfig, jd map[string]interface{}) { | ||
c.RayCollectorConfig = *rcc | ||
c.OSSBucket = jd["ossBucket"].(string) | ||
c.OSSBucket = jd["ossEndpoint"].(string) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be c. OSSEndpoint = jd["ossEndpoint"].(string)
?
Signed-off-by: KunWuLuan <[email protected]>
Signed-off-by: KunWuLuan <[email protected]>
3e6f7e7
to
dbd4bee
Compare
Signed-off-by: KunWuLuan <[email protected]>
dbd4bee
to
57edeac
Compare
Why are these changes needed?
Related issue number
To #3966
Checks