<html><head><meta name="color-scheme" content="light dark"></head><body><pre style="word-wrap: break-word; white-space: pre-wrap;"># Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.

"""
UNTESTED:
read_message
"""

import sys
import sysconfig

import pytest

import pyarrow as pa
import numpy as np


cuda = pytest.importorskip("pyarrow.cuda")

platform = sysconfig.get_platform()
# TODO: enable ppc64 when Arrow C++ supports IPC in ppc64 systems:
has_ipc_support = platform == 'linux-x86_64'  # or 'ppc64' in platform

cuda_ipc = pytest.mark.skipif(
    not has_ipc_support,
    reason='CUDA IPC not supported in platform `%s`' % (platform))

global_context = None  # for flake8
global_context1 = None  # for flake8


def setup_module(module):
    module.global_context = cuda.Context(0)
    module.global_context1 = cuda.Context(cuda.Context.get_num_devices() - 1)


def teardown_module(module):
    del module.global_context


def test_Context():
    assert cuda.Context.get_num_devices() &gt; 0
    assert global_context.device_number == 0
    assert global_context1.device_number == cuda.Context.get_num_devices() - 1

    with pytest.raises(ValueError,
                       match=("device_number argument must "
                              "be non-negative less than")):
        cuda.Context(cuda.Context.get_num_devices())


@pytest.mark.parametrize("size", [0, 1, 1000])
def test_manage_allocate_free_host(size):
    buf = cuda.new_host_buffer(size)
    arr = np.frombuffer(buf, dtype=np.uint8)
    arr[size//4:3*size//4] = 1
    arr_cp = arr.copy()
    arr2 = np.frombuffer(buf, dtype=np.uint8)
    np.testing.assert_equal(arr2, arr_cp)
    assert buf.size == size


def test_context_allocate_del():
    bytes_allocated = global_context.bytes_allocated
    cudabuf = global_context.new_buffer(128)
    assert global_context.bytes_allocated == bytes_allocated + 128
    del cudabuf
    assert global_context.bytes_allocated == bytes_allocated


def make_random_buffer(size, target='host'):
    """Return a host or device buffer with random data.
    """
    if target == 'host':
        assert size &gt;= 0
        buf = pa.allocate_buffer(size)
        assert buf.size == size
        arr = np.frombuffer(buf, dtype=np.uint8)
        assert arr.size == size
        arr[:] = np.random.randint(low=1, high=255, size=size, dtype=np.uint8)
        assert arr.sum() &gt; 0 or size == 0
        arr_ = np.frombuffer(buf, dtype=np.uint8)
        np.testing.assert_equal(arr, arr_)
        return arr, buf
    elif target == 'device':
        arr, buf = make_random_buffer(size, target='host')
        dbuf = global_context.new_buffer(size)
        assert dbuf.size == size
        dbuf.copy_from_host(buf, position=0, nbytes=size)
        return arr, dbuf
    raise ValueError('invalid target value')


@pytest.mark.parametrize("size", [0, 1, 1000])
def test_context_device_buffer(size):
    # Creating device buffer from host buffer;
    arr, buf = make_random_buffer(size)
    cudabuf = global_context.buffer_from_data(buf)
    assert cudabuf.size == size
    arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr, arr2)

    # CudaBuffer does not support buffer protocol
    with pytest.raises(BufferError):
        memoryview(cudabuf)

    # Creating device buffer from array:
    cudabuf = global_context.buffer_from_data(arr)
    assert cudabuf.size == size
    arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr, arr2)

    # Creating device buffer from bytes:
    cudabuf = global_context.buffer_from_data(arr.tobytes())
    assert cudabuf.size == size
    arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr, arr2)

    # Creating a device buffer from another device buffer, view:
    cudabuf2 = cudabuf.slice(0, cudabuf.size)
    assert cudabuf2.size == size
    arr2 = np.frombuffer(cudabuf2.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr, arr2)

    if size &gt; 1:
        cudabuf2.copy_from_host(arr[size//2:])
        arr3 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
        np.testing.assert_equal(np.concatenate((arr[size//2:], arr[size//2:])),
                                arr3)
        cudabuf2.copy_from_host(arr[:size//2])  # restoring arr

    # Creating a device buffer from another device buffer, copy:
    cudabuf2 = global_context.buffer_from_data(cudabuf)
    assert cudabuf2.size == size
    arr2 = np.frombuffer(cudabuf2.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr, arr2)

    cudabuf2.copy_from_host(arr[size//2:])
    arr3 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr, arr3)

    # Slice of a device buffer
    cudabuf2 = cudabuf.slice(0, cudabuf.size+10)
    assert cudabuf2.size == size
    arr2 = np.frombuffer(cudabuf2.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr, arr2)

    cudabuf2 = cudabuf.slice(size//4, size+10)
    assert cudabuf2.size == size - size//4
    arr2 = np.frombuffer(cudabuf2.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr[size//4:], arr2)

    # Creating a device buffer from a slice of host buffer
    soffset = size//4
    ssize = 2*size//4
    cudabuf = global_context.buffer_from_data(buf, offset=soffset,
                                              size=ssize)
    assert cudabuf.size == ssize
    arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr[soffset:soffset + ssize], arr2)

    cudabuf = global_context.buffer_from_data(buf.slice(offset=soffset,
                                                        length=ssize))
    assert cudabuf.size == ssize
    arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr[soffset:soffset + ssize], arr2)

    # Creating a device buffer from a slice of an array
    cudabuf = global_context.buffer_from_data(arr, offset=soffset, size=ssize)
    assert cudabuf.size == ssize
    arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr[soffset:soffset + ssize], arr2)

    cudabuf = global_context.buffer_from_data(arr[soffset:soffset+ssize])
    assert cudabuf.size == ssize
    arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr[soffset:soffset + ssize], arr2)

    # Creating a device buffer from a slice of bytes
    cudabuf = global_context.buffer_from_data(arr.tobytes(),
                                              offset=soffset,
                                              size=ssize)
    assert cudabuf.size == ssize
    arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr[soffset:soffset + ssize], arr2)

    # Creating a device buffer from size
    cudabuf = global_context.new_buffer(size)
    assert cudabuf.size == size

    # Creating device buffer from a slice of another device buffer:
    cudabuf = global_context.buffer_from_data(arr)
    cudabuf2 = cudabuf.slice(soffset, ssize)
    assert cudabuf2.size == ssize
    arr2 = np.frombuffer(cudabuf2.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr[soffset:soffset+ssize], arr2)

    # Creating device buffer from HostBuffer

    buf = cuda.new_host_buffer(size)
    arr_ = np.frombuffer(buf, dtype=np.uint8)
    arr_[:] = arr
    cudabuf = global_context.buffer_from_data(buf)
    assert cudabuf.size == size
    arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr, arr2)

    # Creating device buffer from HostBuffer slice

    cudabuf = global_context.buffer_from_data(buf, offset=soffset, size=ssize)
    assert cudabuf.size == ssize
    arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr[soffset:soffset+ssize], arr2)

    cudabuf = global_context.buffer_from_data(
        buf.slice(offset=soffset, length=ssize))
    assert cudabuf.size == ssize
    arr2 = np.frombuffer(cudabuf.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr[soffset:soffset+ssize], arr2)


@pytest.mark.parametrize("size", [0, 1, 1000])
def test_context_from_object(size):
    ctx = global_context
    arr, cbuf = make_random_buffer(size, target='device')
    dtype = arr.dtype

    # Creating device buffer from a CUDA host buffer
    hbuf = cuda.new_host_buffer(size * arr.dtype.itemsize)
    np.frombuffer(hbuf, dtype=dtype)[:] = arr
    cbuf2 = ctx.buffer_from_object(hbuf)
    assert cbuf2.size == cbuf.size
    arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
    np.testing.assert_equal(arr, arr2)

    # Creating device buffer from a device buffer
    cbuf2 = ctx.buffer_from_object(cbuf2)
    assert cbuf2.size == cbuf.size
    arr2 = np.frombuffer(cbuf2.copy_to_host(), dtype=dtype)
    np.testing.assert_equal(arr, arr2)

    # Trying to create a device buffer from a Buffer
    with pytest.raises(pa.ArrowTypeError,
                       match=('buffer is not backed by a CudaBuffer')):
        ctx.buffer_from_object(pa.py_buffer(b"123"))

    # Trying to create a device buffer from numpy.array
    with pytest.raises(pa.ArrowTypeError,
                       match=("cannot create device buffer view from "
                              ".* \'numpy.ndarray\'")):
        ctx.buffer_from_object(np.array([1, 2, 3]))


def test_foreign_buffer():
    ctx = global_context
    dtype = np.dtype(np.uint8)
    size = 10
    hbuf = cuda.new_host_buffer(size * dtype.itemsize)

    # test host buffer memory reference counting
    rc = sys.getrefcount(hbuf)
    fbuf = ctx.foreign_buffer(hbuf.address, hbuf.size, hbuf)
    assert sys.getrefcount(hbuf) == rc + 1
    del fbuf
    assert sys.getrefcount(hbuf) == rc

    # test postponed deallocation of host buffer memory
    fbuf = ctx.foreign_buffer(hbuf.address, hbuf.size, hbuf)
    del hbuf
    fbuf.copy_to_host()

    # test deallocating the host buffer memory making it inaccessible
    hbuf = cuda.new_host_buffer(size * dtype.itemsize)
    fbuf = ctx.foreign_buffer(hbuf.address, hbuf.size)
    del hbuf
    with pytest.raises(pa.ArrowIOError,
                       match=('Cuda error ')):
        fbuf.copy_to_host()


@pytest.mark.parametrize("size", [0, 1, 1000])
def test_CudaBuffer(size):
    arr, buf = make_random_buffer(size)
    assert arr.tobytes() == buf.to_pybytes()
    cbuf = global_context.buffer_from_data(buf)
    assert cbuf.size == size
    assert not cbuf.is_cpu
    assert arr.tobytes() == cbuf.to_pybytes()
    if size &gt; 0:
        assert cbuf.address &gt; 0

    for i in range(size):
        assert cbuf[i] == arr[i]

    for s in [
            slice(None),
            slice(size//4, size//2),
    ]:
        assert cbuf[s].to_pybytes() == arr[s].tobytes()

    sbuf = cbuf.slice(size//4, size//2)
    assert sbuf.parent == cbuf

    with pytest.raises(TypeError,
                       match="Do not call CudaBuffer's constructor directly"):
        cuda.CudaBuffer()


@pytest.mark.parametrize("size", [0, 1, 1000])
def test_HostBuffer(size):
    arr, buf = make_random_buffer(size)
    assert arr.tobytes() == buf.to_pybytes()
    hbuf = cuda.new_host_buffer(size)
    np.frombuffer(hbuf, dtype=np.uint8)[:] = arr
    assert hbuf.size == size
    assert hbuf.is_cpu
    assert arr.tobytes() == hbuf.to_pybytes()
    for i in range(size):
        assert hbuf[i] == arr[i]
    for s in [
            slice(None),
            slice(size//4, size//2),
    ]:
        assert hbuf[s].to_pybytes() == arr[s].tobytes()

    sbuf = hbuf.slice(size//4, size//2)
    assert sbuf.parent == hbuf

    del hbuf

    with pytest.raises(TypeError,
                       match="Do not call HostBuffer's constructor directly"):
        cuda.HostBuffer()


@pytest.mark.parametrize("size", [0, 1, 1000])
def test_copy_from_to_host(size):
    # Create a buffer in host containing range(size)
    dt = np.dtype('uint16')
    nbytes = size * dt.itemsize
    buf = pa.allocate_buffer(nbytes, resizable=True)  # in host
    assert isinstance(buf, pa.Buffer)
    assert not isinstance(buf, cuda.CudaBuffer)
    arr = np.frombuffer(buf, dtype=dt)
    assert arr.size == size
    arr[:] = range(size)
    arr_ = np.frombuffer(buf, dtype=dt)
    np.testing.assert_equal(arr, arr_)

    # Create a device buffer of the same size and copy from host
    device_buffer = global_context.new_buffer(nbytes)
    assert isinstance(device_buffer, cuda.CudaBuffer)
    assert isinstance(device_buffer, pa.Buffer)
    assert device_buffer.size == nbytes
    assert not device_buffer.is_cpu
    device_buffer.copy_from_host(buf, position=0, nbytes=nbytes)

    # Copy back to host and compare contents
    buf2 = device_buffer.copy_to_host(position=0, nbytes=nbytes)
    arr2 = np.frombuffer(buf2, dtype=dt)
    np.testing.assert_equal(arr, arr2)


@pytest.mark.parametrize("size", [0, 1, 1000])
def test_copy_to_host(size):
    arr, dbuf = make_random_buffer(size, target='device')

    buf = dbuf.copy_to_host()
    assert buf.is_cpu
    np.testing.assert_equal(arr, np.frombuffer(buf, dtype=np.uint8))

    buf = dbuf.copy_to_host(position=size//4)
    assert buf.is_cpu
    np.testing.assert_equal(arr[size//4:], np.frombuffer(buf, dtype=np.uint8))

    buf = dbuf.copy_to_host(position=size//4, nbytes=size//8)
    assert buf.is_cpu
    np.testing.assert_equal(arr[size//4:size//4+size//8],
                            np.frombuffer(buf, dtype=np.uint8))

    buf = dbuf.copy_to_host(position=size//4, nbytes=0)
    assert buf.is_cpu
    assert buf.size == 0

    for (position, nbytes) in [
        (size+2, -1), (-2, -1), (size+1, 0), (-3, 0),
    ]:
        with pytest.raises(ValueError,
                           match='position argument is out-of-range'):
            dbuf.copy_to_host(position=position, nbytes=nbytes)

    for (position, nbytes) in [
        (0, size+1), (size//2, (size+1)//2+1), (size, 1)
    ]:
        with pytest.raises(ValueError,
                           match=('requested more to copy than'
                                  ' available from device buffer')):
            dbuf.copy_to_host(position=position, nbytes=nbytes)

    buf = pa.allocate_buffer(size//4)
    dbuf.copy_to_host(buf=buf)
    np.testing.assert_equal(arr[:size//4], np.frombuffer(buf, dtype=np.uint8))

    if size &lt; 12:
        return

    dbuf.copy_to_host(buf=buf, position=12)
    np.testing.assert_equal(arr[12:12+size//4],
                            np.frombuffer(buf, dtype=np.uint8))

    dbuf.copy_to_host(buf=buf, nbytes=12)
    np.testing.assert_equal(arr[:12], np.frombuffer(buf, dtype=np.uint8)[:12])

    dbuf.copy_to_host(buf=buf, nbytes=12, position=6)
    np.testing.assert_equal(arr[6:6+12],
                            np.frombuffer(buf, dtype=np.uint8)[:12])

    for (position, nbytes) in [
            (0, size+10), (10, size-5),
            (0, size//2), (size//4, size//4+1)
    ]:
        with pytest.raises(ValueError,
                           match=('requested copy does not '
                                  'fit into host buffer')):
            dbuf.copy_to_host(buf=buf, position=position, nbytes=nbytes)


@pytest.mark.parametrize("dest_ctx", ['same', 'another'])
@pytest.mark.parametrize("size", [0, 1, 1000])
def test_copy_from_device(dest_ctx, size):
    arr, buf = make_random_buffer(size=size, target='device')
    lst = arr.tolist()
    if dest_ctx == 'another':
        dest_ctx = global_context1
        if buf.context.device_number == dest_ctx.device_number:
            pytest.skip("not a multi-GPU system")
    else:
        dest_ctx = buf.context
    dbuf = dest_ctx.new_buffer(size)

    def put(*args, **kwargs):
        dbuf.copy_from_device(buf, *args, **kwargs)
        rbuf = dbuf.copy_to_host()
        return np.frombuffer(rbuf, dtype=np.uint8).tolist()
    assert put() == lst
    if size &gt; 4:
        assert put(position=size//4) == lst[:size//4]+lst[:-size//4]
        assert put() == lst
        assert put(position=1, nbytes=size//2) == \
            lst[:1] + lst[:size//2] + lst[-(size-size//2-1):]

    for (position, nbytes) in [
            (size+2, -1), (-2, -1), (size+1, 0), (-3, 0),
    ]:
        with pytest.raises(ValueError,
                           match='position argument is out-of-range'):
            put(position=position, nbytes=nbytes)

    for (position, nbytes) in [
        (0, size+1),
    ]:
        with pytest.raises(ValueError,
                           match=('requested more to copy than'
                                  ' available from device buffer')):
            put(position=position, nbytes=nbytes)

    if size &lt; 4:
        return

    for (position, nbytes) in [
        (size//2, (size+1)//2+1)
    ]:
        with pytest.raises(ValueError,
                           match=('requested more to copy than'
                                  ' available in device buffer')):
            put(position=position, nbytes=nbytes)


@pytest.mark.parametrize("size", [0, 1, 1000])
def test_copy_from_host(size):
    arr, buf = make_random_buffer(size=size, target='host')
    lst = arr.tolist()
    dbuf = global_context.new_buffer(size)

    def put(*args, **kwargs):
        dbuf.copy_from_host(buf, *args, **kwargs)
        rbuf = dbuf.copy_to_host()
        return np.frombuffer(rbuf, dtype=np.uint8).tolist()
    assert put() == lst
    if size &gt; 4:
        assert put(position=size//4) == lst[:size//4]+lst[:-size//4]
        assert put() == lst
        assert put(position=1, nbytes=size//2) == \
            lst[:1] + lst[:size//2] + lst[-(size-size//2-1):]

    for (position, nbytes) in [
            (size+2, -1), (-2, -1), (size+1, 0), (-3, 0),
    ]:
        with pytest.raises(ValueError,
                           match='position argument is out-of-range'):
            put(position=position, nbytes=nbytes)

    for (position, nbytes) in [
        (0, size+1),
    ]:
        with pytest.raises(ValueError,
                           match=('requested more to copy than'
                                  ' available from host buffer')):
            put(position=position, nbytes=nbytes)

    if size &lt; 4:
        return

    for (position, nbytes) in [
        (size//2, (size+1)//2+1)
    ]:
        with pytest.raises(ValueError,
                           match=('requested more to copy than'
                                  ' available in device buffer')):
            put(position=position, nbytes=nbytes)


def test_BufferWriter():
    def allocate(size):
        cbuf = global_context.new_buffer(size)
        writer = cuda.BufferWriter(cbuf)
        return cbuf, writer

    def test_writes(total_size, chunksize, buffer_size=0):
        cbuf, writer = allocate(total_size)
        arr, buf = make_random_buffer(size=total_size, target='host')

        if buffer_size &gt; 0:
            writer.buffer_size = buffer_size

        position = writer.tell()
        assert position == 0
        writer.write(buf.slice(length=chunksize))
        assert writer.tell() == chunksize
        writer.seek(0)
        position = writer.tell()
        assert position == 0

        while position &lt; total_size:
            bytes_to_write = min(chunksize, total_size - position)
            writer.write(buf.slice(offset=position, length=bytes_to_write))
            position += bytes_to_write

        writer.flush()
        assert cbuf.size == total_size
        cbuf.context.synchronize()
        buf2 = cbuf.copy_to_host()
        cbuf.context.synchronize()
        assert buf2.size == total_size
        arr2 = np.frombuffer(buf2, dtype=np.uint8)
        np.testing.assert_equal(arr, arr2)

    total_size, chunk_size = 1 &lt;&lt; 16, 1000
    test_writes(total_size, chunk_size)
    test_writes(total_size, chunk_size, total_size // 16)

    cbuf, writer = allocate(100)
    writer.write(np.arange(100, dtype=np.uint8))
    writer.writeat(50, np.arange(25, dtype=np.uint8))
    writer.write(np.arange(25, dtype=np.uint8))
    writer.flush()

    arr = np.frombuffer(cbuf.copy_to_host(), np.uint8)
    np.testing.assert_equal(arr[:50], np.arange(50, dtype=np.uint8))
    np.testing.assert_equal(arr[50:75], np.arange(25, dtype=np.uint8))
    np.testing.assert_equal(arr[75:], np.arange(25, dtype=np.uint8))


def test_BufferWriter_edge_cases():
    # edge cases, see cuda-test.cc for more information:
    size = 1000
    cbuf = global_context.new_buffer(size)
    writer = cuda.BufferWriter(cbuf)
    arr, buf = make_random_buffer(size=size, target='host')

    assert writer.buffer_size == 0
    writer.buffer_size = 100
    assert writer.buffer_size == 100

    writer.write(buf.slice(length=0))
    assert writer.tell() == 0

    writer.write(buf.slice(length=10))
    writer.buffer_size = 200
    assert writer.buffer_size == 200
    assert writer.num_bytes_buffered == 0

    writer.write(buf.slice(offset=10, length=300))
    assert writer.num_bytes_buffered == 0

    writer.write(buf.slice(offset=310, length=200))
    assert writer.num_bytes_buffered == 0

    writer.write(buf.slice(offset=510, length=390))
    writer.write(buf.slice(offset=900, length=100))

    writer.flush()

    buf2 = cbuf.copy_to_host()
    assert buf2.size == size
    arr2 = np.frombuffer(buf2, dtype=np.uint8)
    np.testing.assert_equal(arr, arr2)


def test_BufferReader():
    size = 1000
    arr, cbuf = make_random_buffer(size=size, target='device')

    reader = cuda.BufferReader(cbuf)
    reader.seek(950)
    assert reader.tell() == 950

    data = reader.read(100)
    assert len(data) == 50
    assert reader.tell() == 1000

    reader.seek(925)
    arr2 = np.zeros(100, dtype=np.uint8)
    n = reader.readinto(arr2)
    assert n == 75
    assert reader.tell() == 1000
    np.testing.assert_equal(arr[925:], arr2[:75])

    reader.seek(0)
    assert reader.tell() == 0
    buf2 = reader.read_buffer()
    arr2 = np.frombuffer(buf2.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr, arr2)


def test_BufferReader_zero_size():
    arr, cbuf = make_random_buffer(size=0, target='device')
    reader = cuda.BufferReader(cbuf)
    reader.seek(0)
    data = reader.read()
    assert len(data) == 0
    assert reader.tell() == 0
    buf2 = reader.read_buffer()
    arr2 = np.frombuffer(buf2.copy_to_host(), dtype=np.uint8)
    np.testing.assert_equal(arr, arr2)


def make_recordbatch(length):
    schema = pa.schema([pa.field('f0', pa.int16()),
                        pa.field('f1', pa.int16())])
    a0 = pa.array(np.random.randint(0, 255, size=length, dtype=np.int16))
    a1 = pa.array(np.random.randint(0, 255, size=length, dtype=np.int16))
    batch = pa.record_batch([a0, a1], schema=schema)
    return batch


def test_batch_serialize():
    batch = make_recordbatch(10)
    hbuf = batch.serialize()
    cbuf = cuda.serialize_record_batch(batch, global_context)

    # Test that read_record_batch works properly
    cbatch = cuda.read_record_batch(cbuf, batch.schema)
    assert isinstance(cbatch, pa.RecordBatch)
    assert batch.schema == cbatch.schema
    assert batch.num_columns == cbatch.num_columns
    assert batch.num_rows == cbatch.num_rows

    # Deserialize CUDA-serialized batch on host
    buf = cbuf.copy_to_host()
    assert hbuf.equals(buf)
    batch2 = pa.ipc.read_record_batch(buf, batch.schema)
    assert hbuf.equals(batch2.serialize())

    assert batch.num_columns == batch2.num_columns
    assert batch.num_rows == batch2.num_rows
    assert batch.column(0).equals(batch2.column(0))
    assert batch.equals(batch2)


def make_table():
    a0 = pa.array([0, 1, 42, None], type=pa.int16())
    a1 = pa.array([[0, 1], [2], [], None], type=pa.list_(pa.int32()))
    a2 = pa.array([("ab", True), ("cde", False), (None, None), None],
                  type=pa.struct([("strs", pa.utf8()),
                                  ("bools", pa.bool_())]))
    # Dictionaries are validated on the IPC read path, but that can produce
    # issues for GPU-located dictionaries.  Check that they work fine.
    a3 = pa.DictionaryArray.from_arrays(
        indices=[0, 1, 1, None],
        dictionary=pa.array(['foo', 'bar']))
    a4 = pa.DictionaryArray.from_arrays(
        indices=[2, 1, 2, None],
        dictionary=a1)
    a5 = pa.DictionaryArray.from_arrays(
        indices=[2, 1, 0, None],
        dictionary=a2)

    arrays = [a0, a1, a2, a3, a4, a5]
    schema = pa.schema([('f{}'.format(i), arr.type)
                        for i, arr in enumerate(arrays)])
    batch = pa.record_batch(arrays, schema=schema)
    table = pa.Table.from_batches([batch])
    return table


def make_table_cuda():
    htable = make_table()
    # Serialize the host table to bytes
    sink = pa.BufferOutputStream()
    with pa.ipc.new_stream(sink, htable.schema) as out:
        out.write_table(htable)
    hbuf = pa.py_buffer(sink.getvalue().to_pybytes())

    # Copy the host bytes to a device buffer
    dbuf = global_context.new_buffer(len(hbuf))
    dbuf.copy_from_host(hbuf, nbytes=len(hbuf))
    # Deserialize the device buffer into a Table
    dtable = pa.ipc.open_stream(cuda.BufferReader(dbuf)).read_all()
    return hbuf, htable, dbuf, dtable


def test_table_deserialize():
    # ARROW-9659: make sure that we can deserialize a GPU-located table
    # without crashing when initializing or validating the underlying arrays.
    hbuf, htable, dbuf, dtable = make_table_cuda()
    # Assert basic fields the same between host and device tables
    assert htable.schema == dtable.schema
    assert htable.num_rows == dtable.num_rows
    assert htable.num_columns == dtable.num_columns
    # Assert byte-level equality
    assert hbuf.equals(dbuf.copy_to_host())
    # Copy DtoH and assert the tables are still equivalent
    assert htable.equals(pa.ipc.open_stream(
        dbuf.copy_to_host()
    ).read_all())


def test_create_table_with_device_buffers():
    # ARROW-11872: make sure that we can create an Arrow Table from
    # GPU-located Arrays without crashing.
    hbuf, htable, dbuf, dtable = make_table_cuda()
    # Construct a new Table from the device Table
    dtable2 = pa.Table.from_arrays(dtable.columns, dtable.column_names)
    # Assert basic fields the same between host and device tables
    assert htable.schema == dtable2.schema
    assert htable.num_rows == dtable2.num_rows
    assert htable.num_columns == dtable2.num_columns
    # Assert byte-level equality
    assert hbuf.equals(dbuf.copy_to_host())
    # Copy DtoH and assert the tables are still equivalent
    assert htable.equals(pa.ipc.open_stream(
        dbuf.copy_to_host()
    ).read_all())


def other_process_for_test_IPC(handle_buffer, expected_arr):
    other_context = pa.cuda.Context(0)
    ipc_handle = pa.cuda.IpcMemHandle.from_buffer(handle_buffer)
    ipc_buf = other_context.open_ipc_buffer(ipc_handle)
    ipc_buf.context.synchronize()
    buf = ipc_buf.copy_to_host()
    assert buf.size == expected_arr.size, repr((buf.size, expected_arr.size))
    arr = np.frombuffer(buf, dtype=expected_arr.dtype)
    np.testing.assert_equal(arr, expected_arr)


@cuda_ipc
@pytest.mark.parametrize("size", [0, 1, 1000])
def test_IPC(size):
    import multiprocessing
    ctx = multiprocessing.get_context('spawn')
    arr, cbuf = make_random_buffer(size=size, target='device')
    ipc_handle = cbuf.export_for_ipc()
    handle_buffer = ipc_handle.serialize()
    p = ctx.Process(target=other_process_for_test_IPC,
                    args=(handle_buffer, arr))
    p.start()
    p.join()
    assert p.exitcode == 0
</pre></body></html>