Skip to content

Commit 74ff183

Browse files
author
Robin VAN DE MERGHEL
committed
feat: Add pilot legacy logging
1 parent 9c7d387 commit 74ff183

File tree

3 files changed

+109
-88
lines changed

3 files changed

+109
-88
lines changed

Pilot/dirac-pilot.py

Lines changed: 16 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -63,25 +63,30 @@
6363
# print the buffer, so we have a "classic' logger back in sync.
6464
sys.stdout.write(bufContent)
6565
# now the remote logger.
66-
remote = pilotParams.pilotLogging and (pilotParams.loggerURL is not None)
67-
if remote:
66+
if pilotParams.pilotLogging:
6867
# In a remote logger enabled Dirac version we would have some classic logger content from a wrapper,
6968
# which we passed in:
7069
receivedContent = ""
7170
if not sys.stdin.isatty():
7271
receivedContent = sys.stdin.read()
7372
log = RemoteLogger(
7473
pilotParams.loggerURL,
75-
"Pilot",
74+
useServerCertificate=pilotParams.useServerCertificate,
75+
name="Pilot",
7676
bufsize=pilotParams.loggerBufsize,
7777
pilotUUID=pilotParams.pilotUUID,
7878
debugFlag=pilotParams.debugFlag,
79-
wnVO=pilotParams.wnVO,
8079
)
8180
log.info("Remote logger activated")
82-
log.buffer.write(receivedContent)
81+
log.buffer.write(log.format_to_json(
82+
"INFO",
83+
receivedContent,
84+
))
8385
log.buffer.flush()
84-
log.buffer.write(bufContent)
86+
log.buffer.write(log.format_to_json(
87+
"INFO",
88+
bufContent,
89+
))
8590
else:
8691
log = Logger("Pilot", debugFlag=pilotParams.debugFlag)
8792

@@ -103,7 +108,7 @@
103108
log.info("Requested command extensions: %s" % str(pilotParams.commandExtensions))
104109

105110
log.info("Executing commands: %s" % str(pilotParams.commands))
106-
111+
remote = pilotParams.pilotLogging
107112
if remote:
108113
# It's safer to cancel the timer here. Each command has got its own logger object with a timer cancelled by the
109114
# finaliser. No need for a timer in the "else" code segment below.
@@ -122,5 +127,8 @@
122127
log.error("Command %s could not be instantiated" % commandName)
123128
# send the last message and abandon ship.
124129
if remote:
125-
log.buffer.flush()
130+
log.buffer.flush(force=True)
126131
sys.exit(-1)
132+
133+
log.info("Pilot tasks finished.")
134+
log.buffer.flush(force=True)

Pilot/pilotCommands.py

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -88,20 +88,29 @@ def wrapper(self):
8888

8989
except SystemExit as exCode: # or Exception ?
9090
# controlled exit
91+
if self.pp.pilotLogging:
92+
try:
93+
self.log.error(str(exCode))
94+
self.log.error(traceback.format_exc())
95+
self.log.buffer.flush(force=True)
96+
except Exception as exc:
97+
self.log.error("Remote logger couldn't be finalised %s " % str(exc))
98+
raise
99+
100+
# If we don't have a remote logger
91101
pRef = self.pp.pilotReference
92102
self.log.info(
93-
"Flushing the remote logger buffer for pilot on sys.exit(): %s (exit code:%s)" % (pRef, str(exCode))
103+
"Flushing the logger buffer for pilot on sys.exit(): %s (exit code:%s)" % (pRef, str(exCode))
94104
)
95105
self.log.buffer.flush() # flush the buffer unconditionally (on sys.exit()).
96-
try:
97-
sendMessage(self.log.url, self.log.pilotUUID, self.log.wnVO, "finaliseLogs", {"retCode": str(exCode)})
98-
except Exception as exc:
99-
self.log.error("Remote logger couldn't be finalised %s " % str(exc))
100-
raise
106+
101107
except Exception as exc:
102108
# unexpected exit: document it and bail out.
103-
self.log.error(str(exc))
104-
self.log.error(traceback.format_exc())
109+
if self.pp.pilotLogging:
110+
# Force flush if it's a remote logger
111+
self.log.buffer.flush(force=True)
112+
else:
113+
self.log.buffer.flush()
105114
raise
106115
finally:
107116
self.log.buffer.cancelTimer()

Pilot/pilotTools.py

Lines changed: 76 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -518,15 +518,15 @@ class RemoteLogger(Logger):
518518

519519
def __init__(
520520
self,
521-
url,
521+
url, # Not used yet
522+
useServerCertificate,
522523
name="Pilot",
523524
debugFlag=False,
524525
pilotOutput="pilot.out",
525526
isPilotLoggerOn=True,
526527
pilotUUID="unknown",
527528
flushInterval=10,
528529
bufsize=1000,
529-
wnVO="unknown",
530530
):
531531
"""
532532
c'tor
@@ -538,34 +538,48 @@ def __init__(
538538
self.pilotUUID = pilotUUID
539539
self.wnVO = wnVO
540540
self.isPilotLoggerOn = isPilotLoggerOn
541-
sendToURL = partial(sendMessage, url, pilotUUID, wnVO, "sendMessage")
541+
sendToURL = partial(sendMessage, useServerCertificate, pilotUUID)
542542
self.buffer = FixedSizeBuffer(sendToURL, bufsize=bufsize, autoflush=flushInterval)
543543

544-
def debug(self, msg, header=True, _sendPilotLog=False):
545-
# TODO: Send pilot log remotely?
544+
def format_to_json(self, level, message):
545+
546+
escaped = json.dumps(message)[1:-1] # remove outer quotes
547+
548+
# Split on escaped newlines
549+
splitted_message = escaped.split("\\n")
550+
551+
output = []
552+
for mess in splitted_message:
553+
if mess:
554+
output.append({
555+
"timestamp": datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%S.%fZ"),
556+
"severity": level,
557+
"message": mess,
558+
"scope": self.name
559+
})
560+
return output
561+
562+
def debug(self, msg, header=True):
546563
super(RemoteLogger, self).debug(msg, header)
547564
if (
548565
self.isPilotLoggerOn and self.debugFlag
549566
): # the -d flag activates this debug flag in CommandBase via PilotParams
550-
self.sendMessage(self.messageTemplate.format(level="DEBUG", message=msg))
567+
self.sendMessage(self.format_to_json(level="DEBUG", message=msg))
551568

552-
def error(self, msg, header=True, _sendPilotLog=False):
553-
# TODO: Send pilot log remotely?
569+
def error(self, msg, header=True):
554570
super(RemoteLogger, self).error(msg, header)
555571
if self.isPilotLoggerOn:
556-
self.sendMessage(self.messageTemplate.format(level="ERROR", message=msg))
572+
self.sendMessage(self.format_to_json(level="ERROR", message=msg))
557573

558-
def warn(self, msg, header=True, _sendPilotLog=False):
559-
# TODO: Send pilot log remotely?
574+
def warn(self, msg, header=True):
560575
super(RemoteLogger, self).warn(msg, header)
561576
if self.isPilotLoggerOn:
562-
self.sendMessage(self.messageTemplate.format(level="WARNING", message=msg))
577+
self.sendMessage(self.format_to_json(level="WARNING", message=msg))
563578

564-
def info(self, msg, header=True, _sendPilotLog=False):
565-
# TODO: Send pilot log remotely?
579+
def info(self, msg, header=True):
566580
super(RemoteLogger, self).info(msg, header)
567581
if self.isPilotLoggerOn:
568-
self.sendMessage(self.messageTemplate.format(level="INFO", message=msg))
582+
self.sendMessage(self.format_to_json(level="INFO", message=msg))
569583

570584
def sendMessage(self, msg):
571585
"""
@@ -577,7 +591,7 @@ def sendMessage(self, msg):
577591
:rtype: None
578592
"""
579593
try:
580-
self.buffer.write(msg + "\n")
594+
self.buffer.write(msg)
581595
except Exception as err:
582596
super(RemoteLogger, self).error("Message not sent")
583597
super(RemoteLogger, self).error(str(err))
@@ -622,34 +636,31 @@ def __init__(self, senderFunc, bufsize=1000, autoflush=10):
622636
self._timer.start()
623637
else:
624638
self._timer = None
625-
self.output = StringIO()
639+
self.output = []
626640
self.bufsize = bufsize
627641
self._nlines = 0
628642
self.senderFunc = senderFunc
629643

630644
@synchronized
631-
def write(self, text):
645+
def write(self, content_json):
632646
"""
633647
Write text to a string buffer. Newline characters are counted and number of lines in the buffer
634648
is increased accordingly.
635649
636-
:param text: text string to write
637-
:type text: str
650+
:param content_json: Json to send, format following format_to_json
651+
:type content_json: list[dict]
638652
:return: None
639653
:rtype: None
640654
"""
641-
# reopen the buffer in a case we had to flush a partially filled buffer
642-
if self.output.closed:
643-
self.output = StringIO()
644-
self.output.write(text)
645-
self._nlines += max(1, text.count("\n"))
655+
656+
self.output.extend(content_json)
657+
658+
try:
659+
self._nlines += max(1, len(content_json))
660+
except Exception:
661+
raise ValueError(content_json)
646662
self.sendFullBuffer()
647663

648-
@synchronized
649-
def getValue(self):
650-
content = self.output.getvalue()
651-
return content
652-
653664
@synchronized
654665
def sendFullBuffer(self):
655666
"""
@@ -659,22 +670,19 @@ def sendFullBuffer(self):
659670

660671
if self._nlines >= self.bufsize:
661672
self.flush()
662-
self.output = StringIO()
673+
self.output = []
663674

664675
@synchronized
665-
def flush(self):
676+
def flush(self, force=False):
666677
"""
667678
Flush the buffer and send log records to a remote server. The buffer is closed as well.
668679
669680
:return: None
670681
:rtype: None
671682
"""
672-
if not self.output.closed and self._nlines > 0:
673-
self.output.flush()
674-
buf = self.getValue()
675-
self.senderFunc(buf)
683+
if force or (self.output and self._nlines > 0):
684+
self.senderFunc(self.output)
676685
self._nlines = 0
677-
self.output.close()
678686

679687
def cancelTimer(self):
680688
"""
@@ -687,40 +695,32 @@ def cancelTimer(self):
687695
self._timer.cancel()
688696

689697

690-
def sendMessage(url, pilotUUID, wnVO, method, rawMessage):
691-
"""
692-
Invoke a remote method on a Tornado server and pass a JSON message to it.
693-
694-
:param str url: Server URL
695-
:param str pilotUUID: pilot unique ID
696-
:param str wnVO: VO name, relevant only if not contained in a proxy
697-
:param str method: a method to be invoked
698-
:param str rawMessage: a message to be sent, in JSON format
699-
:return: None.
700-
"""
701-
caPath = os.getenv("X509_CERT_DIR")
702-
cert = os.getenv("X509_USER_PROXY")
703-
704-
context = ssl.create_default_context()
705-
context.load_verify_locations(capath=caPath)
698+
def sendMessage(useServerCertificate, pilotUUID, rawMessage = []):
699+
cfg = []
700+
if useServerCertificate:
701+
cfg.append("-o /DIRAC/Security/UseServerCertificate=yes")
706702

707-
message = json.dumps((json.dumps(rawMessage), pilotUUID, wnVO))
703+
formatted_logs = json.dumps(rawMessage)
704+
705+
# Escape single quotes in JSON string for safe shell quoting
706+
safe_logs = formatted_logs.replace("'", "'\\''")
708707

709-
try:
710-
context.load_cert_chain(cert) # this is a proxy
711-
raw_data = {"method": method, "args": message}
712-
except IsADirectoryError: # assuming it'a dir containing cert and key
713-
context.load_cert_chain(os.path.join(cert, "hostcert.pem"), os.path.join(cert, "hostkey.pem"))
714-
raw_data = {"method": method, "args": message, "extraCredentials": '"hosts"'}
715-
716-
if sys.version_info.major == 3:
717-
data = urlencode(raw_data).encode("utf-8") # encode to bytes ! for python3
718-
else:
719-
# Python2
720-
data = urlencode(raw_data)
708+
cmd = "dirac-admin-send-pilot-logs %s '%s' %s -d" % (
709+
pilotUUID,
710+
safe_logs,
711+
" ".join(cfg),
712+
)
721713

722-
res = urlopen(url, data, context=context)
723-
res.close()
714+
FNULL = open(os.devnull, 'w')
715+
_p = subprocess.Popen(
716+
cmd,
717+
shell=True,
718+
stdout=FNULL,
719+
stderr=FNULL,
720+
close_fds=False
721+
)
722+
_p.wait()
723+
FNULL.close()
724724

725725

726726
class CommandBase(object):
@@ -750,12 +750,12 @@ def __init__(self, pilotParams):
750750
# remote logger
751751
self.log = RemoteLogger(
752752
loggerURL,
753-
self.__class__.__name__,
753+
useServerCertificate=pilotParams.useServerCertificate,
754+
name=self.__class__.__name__,
754755
pilotUUID=pilotParams.pilotUUID,
755756
debugFlag=self.debugFlag,
756757
flushInterval=interval,
757758
bufsize=bufsize,
758-
wnVO=pilotParams.wnVO,
759759
)
760760

761761
self.log.isPilotLoggerOn = isPilotLoggerOn
@@ -805,8 +805,12 @@ def executeAndGetOutput(self, cmd, environDict=None):
805805
else:
806806
sys.stdout.write(outChunk)
807807
sys.stdout.flush()
808-
if hasattr(self.log, "buffer") and self.log.isPilotLoggerOn:
809-
self.log.buffer.write(outChunk)
808+
if hasattr(self.log, "url"):
809+
# It's a remote logger
810+
self.log.buffer.write(self.log.format_to_json( # type: ignore
811+
"COMMAND",
812+
outChunk
813+
))
810814
outData += outChunk
811815
# If no data was read on any of the pipes then the process has finished
812816
if not dataWasRead:

0 commit comments

Comments
 (0)