Skip to content

fix: Queue messages sent to Continue binary #6457

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ package com.github.continuedev.continueintellijextension.constants

class MessageTypes {
companion object {
val ideMessageTypes = listOf(
val IDE_MESSAGE_TYPES = listOf(
"readRangeInFile",
"isTelemetryEnabled",
"getUniqueId",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,51 +1,40 @@
package com.github.continuedev.continueintellijextension.`continue`

import com.github.continuedev.continueintellijextension.constants.MessageTypes
import com.github.continuedev.continueintellijextension.services.ContinueExtensionSettings
import com.github.continuedev.continueintellijextension.`continue`.process.ContinueBinaryProcess
import com.github.continuedev.continueintellijextension.`continue`.process.ContinueProcessHandler
import com.github.continuedev.continueintellijextension.`continue`.process.ContinueSocketProcess
import com.github.continuedev.continueintellijextension.services.ContinuePluginService
import com.github.continuedev.continueintellijextension.services.TelemetryService
import com.github.continuedev.continueintellijextension.utils.uuid
import com.google.gson.Gson
import com.intellij.openapi.components.service
import com.intellij.openapi.project.Project
import java.io.*
import java.net.Socket
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.nio.file.Paths
import java.nio.file.attribute.PosixFilePermission
import kotlinx.coroutines.*

class CoreMessenger(
private val project: Project,
continueCorePath: String,
private val ideProtocolClient: IdeProtocolClient,
val coroutineScope: CoroutineScope
val coroutineScope: CoroutineScope,
private val onExit: () -> Unit
) {
private var writer: Writer? = null
private var reader: BufferedReader? = null
private var process: Process? = null
private val gson = Gson()
private val responseListeners = mutableMapOf<String, (Any?) -> Unit>()
private val useTcp: Boolean = System.getenv("USE_TCP")?.toBoolean() ?: false

private fun write(message: String) {
try {
writer?.write(message + "\r\n")
writer?.flush()
} catch (e: Exception) {
println("Error writing to Continue core: $e")
}
}
private val process = startContinueProcess()

fun request(messageType: String, data: Any?, messageId: String?, onResponse: (Any?) -> Unit) {
val id = messageId ?: uuid()
val message =
gson.toJson(mapOf("messageId" to id, "messageType" to messageType, "data" to data))
val message = gson.toJson(mapOf("messageId" to id, "messageType" to messageType, "data" to data))
responseListeners[id] = onResponse
coroutineScope.launch(Dispatchers.IO) {
write(message)
}
process.write(message)
}

private fun startContinueProcess(): ContinueProcessHandler {
val isTcp = System.getenv("USE_TCP")?.toBoolean() ?: false
val process = if (isTcp)
ContinueSocketProcess()
else
ContinueBinaryProcess(onExit)
return ContinueProcessHandler(coroutineScope, process, ::handleMessage)
}

private fun handleMessage(json: String) {
Expand All @@ -55,13 +44,10 @@ class CoreMessenger(
val data = responseMap["data"]

// IDE listeners
if (MessageTypes.ideMessageTypes.contains(messageType)) {
if (MessageTypes.IDE_MESSAGE_TYPES.contains(messageType)) {
ideProtocolClient.handleMessage(json) { data ->
val message =
gson.toJson(
mapOf("messageId" to messageId, "messageType" to messageType, "data" to data)
)
write(message)
val message = gson.toJson(mapOf("messageId" to messageId, "messageType" to messageType, "data" to data))
process.write(message)
}
}

Expand All @@ -82,148 +68,7 @@ class CoreMessenger(
}
}

private fun setPermissions(destination: String) {
val osName = System.getProperty("os.name").toLowerCase()
if (osName.contains("mac") || osName.contains("darwin")) {
ProcessBuilder("xattr", "-dr", "com.apple.quarantine", destination).start().waitFor()
setFilePermissions(destination, "rwxr-xr-x")
} else if (osName.contains("nix") || osName.contains("nux")) {
setFilePermissions(destination, "rwxr-xr-x")
}
}

private fun setFilePermissions(path: String, posixPermissions: String) {
val perms = HashSet<PosixFilePermission>()
if (posixPermissions.contains("r")) perms.add(PosixFilePermission.OWNER_READ)
if (posixPermissions.contains("w")) perms.add(PosixFilePermission.OWNER_WRITE)
if (posixPermissions.contains("x")) perms.add(PosixFilePermission.OWNER_EXECUTE)
Files.setPosixFilePermissions(Paths.get(path), perms)
}

private val exitCallbacks: MutableList<() -> Unit> = mutableListOf()

fun onDidExit(callback: () -> Unit) {
exitCallbacks.add(callback)
}

init {
if (useTcp) {
try {
val socket = Socket("127.0.0.1", 3000)
val writer = PrintWriter(socket.getOutputStream(), true)
this.writer = writer
val reader = BufferedReader(InputStreamReader(socket.getInputStream()))
this.reader = reader

Thread {
try {
while (true) {
val line = reader.readLine()
if (line != null && line.isNotEmpty()) {
try {
handleMessage(line)
} catch (e: Exception) {
println("Error handling message: $line")
println(e)
}
} else {
Thread.sleep(100)
}
}
} catch (e: IOException) {
e.printStackTrace()
} finally {
try {
reader.close()
writer.close()
} catch (e: IOException) {
e.printStackTrace()
}
}
}
.start()
} catch (e: Exception) {
println("TCP Connection Error: Unable to connect to 127.0.0.1:3000")
println("Reason: ${e.message}")
e.printStackTrace()
}
} else {
// Set proper permissions synchronously
runBlocking(Dispatchers.IO) {
setPermissions(continueCorePath)
}

// Start the subprocess
val processBuilder =
ProcessBuilder(continueCorePath).directory(File(continueCorePath).parentFile)
process = processBuilder.start()

val outputStream = process!!.outputStream
val inputStream = process!!.inputStream

writer = OutputStreamWriter(outputStream, StandardCharsets.UTF_8)
reader = BufferedReader(InputStreamReader(inputStream, StandardCharsets.UTF_8))

process!!.onExit().thenRun {
exitCallbacks.forEach { it() }
var err = process?.errorStream?.bufferedReader()?.readText()?.trim()
if (err != null) {
// There are often "⚡️Done in Xms" messages, and we want everything after the last one
val delimiter = "⚡ Done in"
val doneIndex = err.lastIndexOf(delimiter)
if (doneIndex != -1) {
err = err.substring(doneIndex + delimiter.length)
}
}

println("Core process exited with output: $err")

// Log the cause of the failure
val telemetryService = service<TelemetryService>()
telemetryService.capture("jetbrains_core_exit", mapOf("error" to err))

// Clean up all resources
writer?.close()
reader?.close()
process?.destroy()
}

coroutineScope.launch(Dispatchers.IO) {
try {
while (true) {
val line = reader?.readLine()
if (line != null && line.isNotEmpty()) {
try {
handleMessage(line)
} catch (e: Exception) {
println("Error handling message: $line")
println(e)
}
} else {
delay(100)
}
}
} catch (e: IOException) {
e.printStackTrace()
} finally {
try {
reader?.close()
writer?.close()
outputStream.close()
inputStream.close()
process?.destroy()
} catch (e: IOException) {
e.printStackTrace()
}
}
}
}
}

fun killSubProcess() {
process?.isAlive?.let {
exitCallbacks.clear()
process?.destroy()
}
process.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ package com.github.continuedev.continueintellijextension.`continue`

import com.github.continuedev.continueintellijextension.services.TelemetryService
import com.github.continuedev.continueintellijextension.utils.castNestedOrNull
import com.github.continuedev.continueintellijextension.utils.getContinueBinaryPath
import com.github.continuedev.continueintellijextension.utils.getMachineUniqueID
import com.intellij.openapi.components.service
import com.intellij.openapi.project.Project
import kotlinx.coroutines.*
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.launch

class CoreMessengerManager(
private val project: Project,
Expand All @@ -19,30 +19,27 @@ class CoreMessengerManager(

init {
coroutineScope.launch {
val continueBinaryPath = getContinueBinaryPath()
setupCoreMessenger(continueBinaryPath)
setupCoreMessenger()
}
}

private fun setupCoreMessenger(continueCorePath: String) {
private fun setupCoreMessenger() {
try {
coreMessenger = CoreMessenger(project, continueCorePath, ideProtocolClient, coroutineScope)

coreMessenger?.request("config/getSerializedProfileInfo", null, null) { response ->
val allowAnonymousTelemetry = response.castNestedOrNull<Boolean>("content", "result", "config", "allowAnonymousTelemetry")

val telemetryService = service<TelemetryService>()
if (allowAnonymousTelemetry == true || allowAnonymousTelemetry == null) {
telemetryService.setup(getMachineUniqueID())
}
}

// On exit, use exponential backoff to create another CoreMessenger
coreMessenger?.onDidExit {
coreMessenger = CoreMessenger(project, ideProtocolClient, coroutineScope, onExit = {
lastBackoffInterval *= 2
println("CoreMessenger exited, retrying in $lastBackoffInterval seconds")
Thread.sleep((lastBackoffInterval * 1000).toLong())
setupCoreMessenger(continueCorePath)
setupCoreMessenger()
})

coreMessenger?.request("config/getSerializedProfileInfo", null, null) { response ->
val allowAnonymousTelemetry =
response.castNestedOrNull<Boolean>("content", "result", "config", "allowAnonymousTelemetry")

val telemetryService = service<TelemetryService>()
if (allowAnonymousTelemetry == true || allowAnonymousTelemetry == null) {
telemetryService.setup(getMachineUniqueID())
}
}
} catch (err: Throwable) {
val telemetryService = service<TelemetryService>()
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
package com.github.continuedev.continueintellijextension.`continue`.process

import com.github.continuedev.continueintellijextension.utils.OS
import com.github.continuedev.continueintellijextension.utils.getContinueBinaryPath
import com.github.continuedev.continueintellijextension.utils.getOS
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.runBlocking
import java.io.File
import java.io.InputStream
import java.io.OutputStream
import java.nio.file.Files
import java.nio.file.Paths
import java.nio.file.attribute.PosixFilePermission

class ContinueBinaryProcess(private val onExit: () -> Unit) : ContinueProcess {

private val process = startBinaryProcess()
override val input: InputStream = process.inputStream
override val output: OutputStream = process.outputStream

override fun close() =
process.destroy()

private fun startBinaryProcess(): Process {
val path = getContinueBinaryPath()
runBlocking(Dispatchers.IO) {
setPermissions()
}
return ProcessBuilder(path).directory(File(path).parentFile)
.start()
.apply { onExit().thenRun(onExit) }
}

private companion object {

private fun setPermissions() {
val os = getOS()
when (os) {
OS.MAC -> setMacOsPermissions()
OS.WINDOWS -> {}
OS.LINUX -> elevatePermissions()
}
}

private fun setMacOsPermissions() {
ProcessBuilder("xattr", "-dr", "com.apple.quarantine", getContinueBinaryPath()).start().waitFor()
elevatePermissions()
}

private fun elevatePermissions() {
val path = getContinueBinaryPath()
val permissions = setOf(
PosixFilePermission.OWNER_READ,
PosixFilePermission.OWNER_WRITE,
PosixFilePermission.OWNER_EXECUTE
)
Files.setPosixFilePermissions(Paths.get(path), permissions)
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.github.continuedev.continueintellijextension.`continue`.process

import java.io.InputStream
import java.io.OutputStream

interface ContinueProcess {

val input: InputStream
val output: OutputStream

fun close()

}
Loading
Loading