diff --git a/core/api/kotlinx-io-core.klib.api b/core/api/kotlinx-io-core.klib.api index 02d9050a7..3d2b6d5a1 100644 --- a/core/api/kotlinx-io-core.klib.api +++ b/core/api/kotlinx-io-core.klib.api @@ -1,5 +1,6 @@ // Klib ABI Dump // Targets: [androidNativeArm32, androidNativeArm64, androidNativeX64, androidNativeX86, iosArm64, iosSimulatorArm64, iosX64, js, linuxArm32Hfp, linuxArm64, linuxX64, macosArm64, macosX64, mingwX64, tvosArm64, tvosSimulatorArm64, tvosX64, wasmJs, wasmWasi, watchosArm32, watchosArm64, watchosDeviceArm64, watchosSimulatorArm64, watchosX64] +// Alias: native => [androidNativeArm32, androidNativeArm64, androidNativeX64, androidNativeX86, iosArm64, iosSimulatorArm64, iosX64, linuxArm32Hfp, linuxArm64, linuxX64, macosArm64, macosX64, mingwX64, tvosArm64, tvosSimulatorArm64, tvosX64, watchosArm32, watchosArm64, watchosDeviceArm64, watchosSimulatorArm64, watchosX64] // Alias: apple => [iosArm64, iosSimulatorArm64, iosX64, macosArm64, macosX64, tvosArm64, tvosSimulatorArm64, tvosX64, watchosArm32, watchosArm64, watchosDeviceArm64, watchosSimulatorArm64, watchosX64] // Rendering settings: // - Signature version: 2 @@ -300,6 +301,15 @@ final inline fun (kotlinx.io.unsafe/SegmentReadContext).kotlinx.io.unsafe/withDa final inline fun (kotlinx.io/Sink).kotlinx.io/writeToInternalBuffer(kotlin/Function1) // kotlinx.io/writeToInternalBuffer|writeToInternalBuffer@kotlinx.io.Sink(kotlin.Function1){}[0] final inline fun <#A: kotlin/Any?> (kotlinx.io/Buffer).kotlinx.io/seek(kotlin/Long, kotlin/Function2): #A // kotlinx.io/seek|seek@kotlinx.io.Buffer(kotlin.Long;kotlin.Function2){0ยง}[0] +// Targets: [native] +final fun (kotlinx.io/Sink).kotlinx.io/write(kotlinx.cinterop/CPointer>, kotlin/Long) // kotlinx.io/write|write@kotlinx.io.Sink(kotlinx.cinterop.CPointer>;kotlin.Long){}[0] + +// Targets: [native] +final fun (kotlinx.io/Source).kotlinx.io/readAtMostTo(kotlinx.cinterop/CPointer>, kotlin/Long): kotlin/Long // kotlinx.io/readAtMostTo|readAtMostTo@kotlinx.io.Source(kotlinx.cinterop.CPointer>;kotlin.Long){}[0] + +// Targets: [native] +final fun (kotlinx.io/Source).kotlinx.io/readTo(kotlinx.cinterop/CPointer>, kotlin/Long) // kotlinx.io/readTo|readTo@kotlinx.io.Source(kotlinx.cinterop.CPointer>;kotlin.Long){}[0] + // Targets: [apple] final fun (kotlinx.io/Sink).kotlinx.io/asNSOutputStream(): platform.Foundation/NSOutputStream // kotlinx.io/asNSOutputStream|asNSOutputStream@kotlinx.io.Sink(){}[0] diff --git a/core/native/src/SinksNative.kt b/core/native/src/SinksNative.kt new file mode 100644 index 000000000..cf84fb639 --- /dev/null +++ b/core/native/src/SinksNative.kt @@ -0,0 +1,45 @@ +/* + * Copyright 2010-2025 JetBrains s.r.o. and respective authors and developers. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file. + */ + +package kotlinx.io + +import kotlinx.cinterop.* +import kotlinx.io.unsafe.UnsafeBufferOperations +import platform.posix.memcpy + +/** + * Writes exactly [byteCount] bytes from a memory pointed by [ptr] into this [Sink](this). + * + * **Note that this function does not verify whether the [ptr] points to a readable memory region.** + * + * @param ptr The memory region to read data from. + * @param byteCount The number of bytes that should be written into this sink from [ptr]. + * + * @throws IllegalArgumentException when [byteCount] is negative. + * @throws IOException when some I/O error happens. + */ +@DelicateIoApi +@OptIn(ExperimentalForeignApi::class, UnsafeIoApi::class, InternalIoApi::class, UnsafeNumber::class) +public fun Sink.write(ptr: CPointer, byteCount: Long) { + require(byteCount >= 0L) { "byteCount shouldn't be negative: $byteCount" } + + var remaining = byteCount + var currentOffset = 0L + + while (remaining > 0) { + UnsafeBufferOperations.writeToTail(buffer, 1) { array, startIndex, endIndex -> + val toWrite = minOf(endIndex - startIndex, remaining).toInt() + array.usePinned { pinned -> + memcpy(pinned.addressOf(startIndex), ptr + currentOffset, toWrite.convert()) + } + currentOffset += toWrite + remaining -= toWrite + + toWrite + } + + hintEmit() + } +} diff --git a/core/native/src/SourcesNative.kt b/core/native/src/SourcesNative.kt new file mode 100644 index 000000000..c6c538d50 --- /dev/null +++ b/core/native/src/SourcesNative.kt @@ -0,0 +1,91 @@ +/* + * Copyright 2010-2025 JetBrains s.r.o. and respective authors and developers. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file. + */ + +package kotlinx.io + +import kotlinx.cinterop.* +import kotlinx.io.unsafe.UnsafeBufferOperations +import platform.posix.memcpy + +/** + * Reads at most [byteCount] bytes from this [Source](this), writes them into [ptr] and returns the number of + * bytes read. + * + * **Note that this function does not verify whether the [ptr] points to a writeable memory region.** + * + * @param ptr The memory region to write data into. + * @param byteCount The maximum number of bytes to read from this source. + * + * @throws IllegalArgumentException when [byteCount] is negative. + * @throws IOException when some I/O error happens. + */ +@DelicateIoApi +@OptIn(ExperimentalForeignApi::class, InternalIoApi::class, UnsafeIoApi::class, UnsafeNumber::class) +public fun Source.readAtMostTo(ptr: CPointer, byteCount: Long): Long { + require(byteCount >= 0L) { "byteCount shouldn't be negative: $byteCount" } + + if (byteCount == 0L) return 0L + + if (!request(1L)) { + return if (exhausted()) -1L else 0L + } + + var consumed = 0L + UnsafeBufferOperations.readFromHead(buffer) { array, startIndex, endIndex -> + val toRead = minOf(endIndex - startIndex, byteCount).toInt() + + array.usePinned { + memcpy(ptr, it.addressOf(startIndex), toRead.convert()) + } + + consumed += toRead + toRead + } + + return consumed +} + +/** + * Reads exactly [byteCount] bytes from this [Source](this) and writes them into a memory region pointed by [ptr]. + * + * **Note that this function does not verify whether the [ptr] points to a writeable memory region.** + * + * This function consumes data from the source even if an error occurs. + * + * @param ptr The memory region to write data into. + * @param byteCount The exact number of bytes to read from this source. + * + * @throws IllegalArgumentException when [byteCount] is negative. + * @throws EOFException when the source exhausts before [byteCount] were read. + * @throws IOException when some I/O error happens. + */ +@DelicateIoApi +@OptIn(ExperimentalForeignApi::class, InternalIoApi::class, UnsafeIoApi::class, UnsafeNumber::class) +public fun Source.readTo(ptr: CPointer, byteCount: Long) { + require(byteCount >= 0L) { "byteCount shouldn't be negative: $byteCount" } + + if (byteCount == 0L) return + + var consumed = 0L + + while (consumed < byteCount) { + if (!request(1L)) { + if (exhausted()) { + throw EOFException("The source is exhausted before reading $byteCount bytes " + + "(it contained only $consumed bytes)") + } + } + UnsafeBufferOperations.readFromHead(buffer) { array, startIndex, endIndex -> + val toRead = minOf(endIndex - startIndex, byteCount - consumed).toInt() + + array.usePinned { + memcpy(ptr + consumed, it.addressOf(startIndex), toRead.convert()) + } + + consumed += toRead + toRead + } + } +} diff --git a/core/native/test/SinksNativeTest.kt b/core/native/test/SinksNativeTest.kt new file mode 100644 index 000000000..e4d08a4b2 --- /dev/null +++ b/core/native/test/SinksNativeTest.kt @@ -0,0 +1,87 @@ +/* + * Copyright 2010-2025 JetBrains s.r.o. and respective authors and developers. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file. + */ + +package kotlinx.io + +import kotlinx.cinterop.* +import kotlin.test.* + +class BufferSinksNativeTest : SinksNativeTest(SinkFactory.BUFFER) +class BufferedSinkSinksNativeTest : SinksNativeTest(SinkFactory.REAL_BUFFERED_SINK) + +private const val SEGMENT_SIZE = Segment.SIZE + +@OptIn(ExperimentalForeignApi::class, DelicateIoApi::class) +abstract class SinksNativeTest internal constructor(factory: SinkFactory) { + private val buffer = Buffer() + private val sink = factory.create(buffer) + + @Test + fun writePointer() { + val data = "hello world".encodeToByteArray() + + data.usePinned { pinned -> + sink.write(pinned.addressOf(0), data.size.toLong()) + } + sink.flush() + assertEquals("hello world", buffer.readString()) + + data.usePinned { pinned -> + sink.write(pinned.addressOf(0), 5) + } + sink.flush() + assertEquals("hello", buffer.readString()) + + data.usePinned { pinned -> + sink.write(pinned.addressOf(6), 5) + } + sink.flush() + assertEquals("world", buffer.readString()) + + data.usePinned { pinned -> + sink.write(pinned.addressOf(0), 0) + } + sink.flush() + assertTrue(buffer.exhausted()) + } + + @Test + fun writeOnSegmentsBorder() { + val data = "hello world".encodeToByteArray() + val padding = ByteArray(SEGMENT_SIZE - 3) { 0xaa.toByte() } + + sink.write(padding) + data.usePinned { pinned -> + sink.write(pinned.addressOf(0), data.size.toLong()) + } + sink.flush() + + buffer.skip(padding.size.toLong()) + assertEquals("hello world", buffer.readString()) + } + + @Test + fun writeOverMultipleSegments() { + val data = ByteArray((2.5 * SEGMENT_SIZE).toInt()) { 0xaa.toByte() } + + data.usePinned { pinned -> + sink.write(pinned.addressOf(0), data.size.toLong()) + } + sink.flush() + + assertContentEquals(data, buffer.readByteArray()) + } + + @Test + fun writeUsingIllegalLength() { + byteArrayOf(0).usePinned { pinned -> + val ptr = pinned.addressOf(0) + + assertFailsWith { + sink.write(ptr, byteCount = -1L) + } + } + } +} diff --git a/core/native/test/SourcesNativeTest.kt b/core/native/test/SourcesNativeTest.kt new file mode 100644 index 000000000..cd3c8f2dc --- /dev/null +++ b/core/native/test/SourcesNativeTest.kt @@ -0,0 +1,207 @@ +/* + * Copyright 2010-2025 JetBrains s.r.o. and respective authors and developers. + * Use of this source code is governed by the Apache 2.0 license that can be found in the LICENCE file. + */ + +package kotlinx.io + +import kotlinx.cinterop.ExperimentalForeignApi +import kotlinx.cinterop.addressOf +import kotlinx.cinterop.usePinned +import kotlin.test.* + + +class BufferSourcesNativeTest : SourcesNativeTest(SourceFactory.BUFFER) +class BufferedSourceSourcesNativeTest : SourcesNativeTest(SourceFactory.REAL_BUFFERED_SOURCE) +class OneByteAtATimeBufferedSourceSourcesNativeTest : SourcesNativeTest(SourceFactory.ONE_BYTE_AT_A_TIME_BUFFERED_SOURCE) +class OneByteAtATimeBufferSourcesNativeTest : SourcesNativeTest(SourceFactory.ONE_BYTE_AT_A_TIME_BUFFER) +class PeekBufferSourcesNativeTest : SourcesNativeTest(SourceFactory.PEEK_BUFFER) +class PeekBufferedSourceSourcesNativeTest : SourcesNativeTest(SourceFactory.PEEK_BUFFERED_SOURCE) + +private const val SEGMENT_SIZE = Segment.SIZE + +@OptIn(ExperimentalForeignApi::class, DelicateIoApi::class) +abstract class SourcesNativeTest internal constructor(private val factory: SourceFactory) { + private val pipe = factory.pipe() + private val source = pipe.source + private val sink = pipe.sink + + @Test + fun readAtMost() { + if (factory.isOneByteAtATime) return + + val hello = "hello world" + val dst = ByteArray(128) + + sink.apply { + writeString(hello) + emit() + } + + dst.usePinned { pinned -> + val bytesRead = source.readAtMostTo(pinned.addressOf(0), dst.size.toLong()) + assertEquals(hello.length.toLong(), bytesRead) + + assertEquals("hello world", dst.copyOfRange(0, hello.length).decodeToString()) + assertTrue(dst.copyOfRange(hello.length, dst.size).all { it == 0.toByte() }) + } + dst.fill(0) + + sink.apply { + writeString(hello) + emit() + } + val helloLength = "hello".length + dst.usePinned { pinned -> + val bytesRead = source.readAtMostTo(pinned.addressOf(0), helloLength.toLong()) + assertEquals(helloLength.toLong(), bytesRead) + + assertEquals("hello", dst.copyOfRange(0, helloLength).decodeToString()) + assertTrue(dst.copyOfRange(helloLength, dst.size).all { it == 0.toByte() }) + } + assertEquals(" world", source.readString()) + dst.fill(0) + + sink.apply { + writeString(hello) + emit() + } + dst.usePinned { pinned -> + assertEquals(0, source.readAtMostTo(pinned.addressOf(0), 0)) + } + assertEquals(hello, source.readString()) + } + + @Test + fun readAtMostOneAtATime() { + if (!factory.isOneByteAtATime) return + + val hello = "hello world" + val dst = ByteArray(128) + + sink.apply { + writeString(hello) + emit() + } + + dst.usePinned { pinned -> + val bytesRead = source.readAtMostTo(pinned.addressOf(0), dst.size.toLong()) + assertEquals(1L, bytesRead) + + assertEquals('h'.code.toByte(), dst[0]) + assertTrue(dst.copyOfRange(1, dst.size).all { it == 0.toByte() }) + } + assertEquals("ello world", source.readString()) + dst.fill(0) + + sink.apply { + writeString(hello) + emit() + } + dst.usePinned { pinned -> + assertEquals(0, source.readAtMostTo(pinned.addressOf(0), 0)) + } + assertEquals(hello, source.readString()) + } + + @Test + fun readAtMostFromMultipleSegments() { + if (factory.isOneByteAtATime) return + + sink.write(ByteArray(SEGMENT_SIZE * 3)) + sink.emit() + + val dst = ByteArray(SEGMENT_SIZE * 3) + dst.usePinned { pinned -> + assertEquals(SEGMENT_SIZE.toLong(), source.readAtMostTo(pinned.addressOf(0), dst.size.toLong())) + } + source.transferTo(discardingSink()) + } + + @Test + fun readAtMostFromExhaustedSource() { + val dst = ByteArray(128) + dst.usePinned { pinned -> + assertEquals(-1L, source.readAtMostTo(pinned.addressOf(0), dst.size.toLong())) + } + } + + @Test + fun readAtMostWithIllegalLength() { + val buffer = byteArrayOf(0) + + buffer.usePinned { pinned -> + val ptr = pinned.addressOf(0) + + assertFailsWith { + source.readAtMostTo(ptr, byteCount = -1L) + } + } + } + + @Test + fun readTo() { + val hello = "hello world" + val dst = ByteArray(128) + + sink.writeString(hello) + sink.emit() + + dst.usePinned { pinned -> + source.readTo(pinned.addressOf(0), hello.length.toLong()) + assertEquals(hello, dst.copyOfRange(0, hello.length).decodeToString()) + } + dst.fill(0) + + sink.writeString(hello) + sink.emit() + + dst.usePinned { pinned -> + source.readTo(pinned.addressOf(0), 5) + assertEquals("hello", dst.copyOfRange(0, 5).decodeToString()) + } + assertEquals(" world", source.readString()) + + sink.writeString(hello) + assertFailsWith { + dst.usePinned { pinned -> + source.readTo(pinned.addressOf(0), dst.size.toLong()) + } + } + } + + @Test + fun readToMultipleSegments() { + val data = ByteArray((2.5 * SEGMENT_SIZE).toInt()) { it.toByte() } + val dst = ByteArray(data.size) + + sink.write(data) + sink.emit() + + dst.usePinned { pinned -> + source.readTo(pinned.addressOf(0), data.size.toLong()) + } + + assertContentEquals(data, dst) + } + + @Test + fun readToFromExhaustedSource() { + val dst = ByteArray(128) + assertFailsWith { + dst.usePinned { pinned -> + source.readTo(pinned.addressOf(0), 1L) + } + } + } + + @Test + fun readToWithIllegalLength() { + val buffer = byteArrayOf(0) + assertFailsWith { + buffer.usePinned { pinned -> + source.readTo(pinned.addressOf(0), byteCount = -1L) + } + } + } +}