Skip to content

Commit c8b92f5

Browse files
Maren van OtterdijkGavinMendelGleason
andauthored
ensure that openai batch ingest and search server work again (#8)
* Making storage more generic * Adding back in openai comparator * Make storage work for 1536 * notes * Adding gpu version of collation * Nix naming * Rename to search collation * Add core files * Forgot key files! * Fix syntax error * Inherit from Struct * Add ctypes * Add argparse * Add required arguments * Argparse fixes * Ooops * Correct name for index and queues * Read input prefix * ctypes sizeof * Fix ctypes struct * octet length * Interpret as int * Adding file buf to unpack * degenerate tuple * Print pair size * Fix fields * pair size * Try to use unpacking * Iter unpack * Make it fast * Don't read the entire search file in * Read buffer * Read matches * Load the whole thing * Too many reads * Push the vectors in * Skip empty queues * Printing size * Print size * Print range * Add pair size * Fixing size calculations * Fix range * long long * Fix calculation * Try to read it out * Bail condition * mispell * Oops * Alignment issues? * Alignment * Only compare fragment against itself * Running on big gpu machine * Mispelled directory * mention count of vectors * Make it an int * Need it to be a raw * Remapping of ids * Do exactly one vector * Dtype of f32 * 10 vectors * Remove ridiculous length designation * Print sizes * Termination criterion was wrong * Forgot the map over ids * Update correct cuda * Typo * Correct axis * Make a byte array for writeability * Define function in the context of device * Put it on cuda * Get both * Set cuda * typo * Don't use full vectors but come up with an estimate * Don't exit * Make it an int * Oops, need a float * Print on distance * Remove distance print * Writer * JSON * What are these things * Take first element out * Use correct offsets * Write by line * include openai response in BadJson error * Adding server side bug fixes * debug prints * verify lock situation * print more * flip order * try this * unworking code * something * prints * raw embed * write all * byte size * float size * remove debug prints --------- Co-authored-by: Gavin Mendel-Gleason <[email protected]>
1 parent db2ac34 commit c8b92f5

File tree

20 files changed

+631
-154
lines changed

20 files changed

+631
-154
lines changed

flake.nix

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,14 @@
5656
type = "app";
5757
program = "${p.vectorlink-task-monitor}/bin/task-monitor";
5858
};
59+
collation = {
60+
type = "app";
61+
program = "${p.search-collation}/bin/search-collation";
62+
};
63+
collation-server = {
64+
type = "app";
65+
program = "${p.search-collation}/bin/collation-server";
66+
};
5967
vectorize-server = {
6068
type = "app";
6169
program = "${p.vectorlink-vectorize}/bin/vectorize-server";

parallel-hnsw/src/types.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,21 @@ pub enum AbstractVector<'a, T: ?Sized> {
4242
Unstored(&'a T),
4343
}
4444

45+
impl<'a> AbstractVector<'a, Vec<f32>> {
46+
pub fn convert_to_array<const N: usize>(&'a self) -> AbstractVector<'a, [f32; N]> {
47+
match self {
48+
AbstractVector::Stored(id) => AbstractVector::Stored(*id),
49+
AbstractVector::Unstored(v) => {
50+
assert_eq!(v.len(), N);
51+
let slice = &v[..];
52+
let ptr = slice.as_ptr();
53+
let array_borrow: &[f32; N] = unsafe { (ptr as *const [f32; N]).as_ref().unwrap() };
54+
AbstractVector::Unstored(array_borrow)
55+
}
56+
}
57+
}
58+
}
59+
4560
impl<'a, T: ?Sized> AbstractVector<'a, T> {
4661
pub fn convert_into<T2>(&self) -> AbstractVector<'a, T2>
4762
where
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
{ pkgs }:
2+
with pkgs;
3+
with pkgs.python311Packages;
4+
buildPythonPackage rec {
5+
name = "search-collation";
6+
src = ./.;
7+
format = "pyproject";
8+
propagatedBuildInputs = [
9+
poetry-core
10+
numpy
11+
torch
12+
accelerate
13+
sentence-transformers
14+
vectorlink.vectorlink-task-py
15+
];
16+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
{ pkgs }:
2+
with pkgs.python311Packages;
3+
buildPythonPackage rec {
4+
name = "search-collation";
5+
pyproject = true;
6+
src = ./.;
7+
8+
# honestly unsure why this can't be dependencies instead, but this
9+
# works.
10+
propagatedBuildInputs = [
11+
numpy
12+
torch
13+
transformers
14+
accelerate
15+
sentence-transformers
16+
];
17+
18+
nativeBuildInputs = [
19+
poetry-core
20+
];
21+
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
[tool.poetry]
2+
name = "search-collation"
3+
version = "0.1.0"
4+
description = ""
5+
authors = ["Gavin Mendel-Gleason <[email protected]>"]
6+
readme = "README.md"
7+
8+
[tool.poetry.dependencies]
9+
python = "^3.11"
10+
numpy = "^1.26.4"
11+
torch = "^2.2.2"
12+
vectorlink-task = {path = "../../vectorlink-task-py"}
13+
14+
[build-system]
15+
requires = ["poetry-core"]
16+
build-backend = "poetry.core.masonry.api"
17+
18+
[tool.poetry.scripts]
19+
collation = "search_collation.collation:main"
20+
collation-server = "search_collation.collation_etcd:main"

python/search-collation/search_collation/__init__.py

Whitespace-only changes.
Lines changed: 171 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,171 @@
1+
import struct
2+
import argparse
3+
import sys
4+
import numpy
5+
import torch
6+
import csv
7+
import json
8+
9+
def get_offsets(data, i):
10+
position = i * 8
11+
start = struct.unpack_from('<Q', data, position)[0]
12+
end = struct.unpack_from('<Q', data, position + 8)[0]
13+
return (start, end)
14+
15+
if __name__ == '__main__':
16+
parser = argparse.ArgumentParser()
17+
parser.add_argument('-i', '--input-prefix', help='input match file prefix (before .idx or .match) to interpret', required=True)
18+
parser.add_argument('-o', '--output-file', help='output file for reordered match', required=True)
19+
parser.add_argument('-d', '--directory', help='vector files directory', required=True)
20+
parser.add_argument('-f', '--full', help='use full vector distances', action='store_true', default=False)
21+
parser.add_argument('-t', '--threshold', help='threshold value to use to chop distance', type=float, default=1.0)
22+
parser.add_argument('-r', '--report-type', help='the type of report (one of: csv, binary)', choices=['csv', 'binary'], default='csv')
23+
parser.add_argument('-l', '--lines', help='lines file with the actual data', required=True)
24+
parser.add_argument('-x', '--index', help='lines index file', required=True)
25+
args = parser.parse_args()
26+
27+
28+
threshold = float('inf')
29+
if args.threshold:
30+
threshold = args.threshold
31+
32+
# 1. First, load match file.
33+
input_prefix = args.input_prefix
34+
input_file = f"{input_prefix}.queues"
35+
input_index = f"{input_prefix}.index"
36+
37+
pair_size = struct.calcsize("<Qf")
38+
print(f"pair size: {pair_size}")
39+
ulong_size = struct.calcsize("<Q")
40+
print(f"ulong size: {ulong_size}")
41+
# sys.exit(0)
42+
queue_ids = {}
43+
with open(input_index, 'rb') as idx:
44+
idx_buf = idx.read()
45+
ulongs_in_file = int(len(idx_buf) / ulong_size)
46+
with open(input_file, 'rb') as ifile:
47+
for i in range(0, ulongs_in_file - 1):
48+
start = struct.unpack_from("<Q", idx_buf, i * ulong_size)[0]
49+
end = struct.unpack_from("<Q", idx_buf, (i+1) * ulong_size)[0]
50+
#print(f"range: {end}-{start}")
51+
size = int((end - start))
52+
if size == 0:
53+
continue
54+
queue_buf = ifile.read(size)
55+
# Do I need this extra f for alignment?
56+
array = struct.iter_unpack("<Qff", queue_buf)
57+
queue_ids[i] = []
58+
for (vid, distance, _padding) in list(array):
59+
if distance < threshold:
60+
#print(f"distance?: {distance}")
61+
queue_ids[i].append(vid)
62+
63+
if not args.full:
64+
# 2. Alternative branch: we do not need to reorder and can directly output the appropriate matches
65+
# to get real row_id file_id we need to load the whole thing into memory
66+
f = open(args.lines, 'rb')
67+
data = f.read()
68+
x = open(args.index, 'rb')
69+
offsets = x.read()
70+
71+
o = open(args.output_file, 'w')
72+
writer = csv.writer(o)
73+
for i in result:
74+
(i_start, i_end) = get_offsets(offsets, i)
75+
#print(f"i start: {i_start} i_end: {i_end}")
76+
i_json = json.loads(data[i_start:i_end])
77+
i_dfi = i_json['DATAFILE_ID']
78+
i_ri = i_json['ROW_ID']
79+
for j in result[i]:
80+
(j_start, j_end) = get_offsets(offsets, j)
81+
j_json = json.loads(data[j_start:j_end])
82+
j_dfi = j_json['DATAFILE_ID']
83+
j_ri = j_json['ROW_ID']
84+
85+
writer.writerow([i_dfi,i_ri,j_dfi,j_ri])
86+
sys.exit(0)
87+
88+
89+
# 2. Prescan vectors for loading from the match file
90+
# * requires offset calculation for match vector (but not for 0)
91+
ids = []
92+
for key in queue_ids:
93+
ids.append(key)
94+
for i in queue_ids[key]:
95+
ids.append(i)
96+
97+
ids = sorted(set(ids))
98+
99+
id_map = {}
100+
for i in range(0,len(ids)):
101+
id_map[ids[i]] = i
102+
103+
# 3. Preload the vectors into the GPU
104+
vector_file_size = 128370618368
105+
f32_size = struct.calcsize("<f")
106+
vector_size = int(1024 * f32_size) # dimension * f32
107+
vector_file_count = int(vector_file_size / vector_size)
108+
109+
file_no = 0
110+
f = open(f"{args.directory}/{file_no}.vecs", 'rb')
111+
buf = bytearray(b'')
112+
count = 0
113+
for i in ids:
114+
new_file_no = int(i / vector_file_count)
115+
if new_file_no != file_no:
116+
break
117+
# Only comparing against ourselves
118+
#file_no = new_file_no
119+
#f.close()
120+
#f = open(f"{directory}/{file_no}.vecs", 'rb')
121+
file_offset = i % vector_file_count * file_no
122+
f.seek(file_offset * vector_size)
123+
raw_buf = f.read(vector_size)
124+
buf += raw_buf
125+
count += 1
126+
#if count >= 10:
127+
# break
128+
129+
f.close()
130+
tmp = open("/home/ubuntu/raw",'wb')
131+
tmp.write(buf)
132+
tmp.close()
133+
134+
# 4. Perform match calculations and write the output matches as binary structs
135+
#
136+
# The match calculation is a dot product of the match vectors and the candidate
137+
# queue
138+
139+
torch.device("cuda")
140+
import torch._dynamo as dynamo
141+
torch._dynamo.config.verbose = True
142+
torch.backends.cudnn.benchmark = True
143+
144+
def cosine_distance(X, i, ids):
145+
m = torch.index_select(X,0,ids)
146+
mT = torch.transpose(m, 0, 1)
147+
v = torch.index_select(X,0,i)
148+
d = torch.matmul(v, mT)
149+
m_norms = torch.norm(m, dim=1)
150+
v_norm = torch.norm(v, dim=1)
151+
cosine = d / (m_norms * v_norm)
152+
return ( (cosine - 1) / -2)
153+
154+
X = torch.frombuffer(buf, dtype=torch.float32)
155+
X = X.reshape([len(ids), 1024])
156+
compiled_cosine = torch.compile(cosine_distance, mode="max-autotune", fullgraph=True)
157+
158+
# output file setup
159+
o = open(args.output_file, 'wb')
160+
161+
obuf = b''
162+
for i in queue_ids:
163+
ids = queue_ids[i]
164+
I = torch.tensor(list(map(lambda i: id_map[i], ids)))
165+
v_i = torch.tensor([id_map[i]])
166+
results = compiled_cosine(X, v_i, I)
167+
pairs = list(zip(queue_ids[i],results.numpy()[0]))
168+
for pair in pairs:
169+
obuf += struct.pack("<Qff", pair[0], pair[1], 0.0)
170+
171+
o.write(obuf)

python/search-collation/search_collation/collation_etcd.py

Whitespace-only changes.

vectorlink-cross-search/src/handler.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,12 @@ pub struct SearchProgress {
4242

4343
pub struct VectorlinkTaskHandler;
4444

45+
#[repr(C)]
46+
pub struct ResultPair {
47+
vectorid: usize,
48+
distance: f32,
49+
}
50+
4551
#[async_trait]
4652
impl TaskHandler for VectorlinkTaskHandler {
4753
type Init = SearchRequest;
@@ -149,6 +155,7 @@ impl TaskHandler for VectorlinkTaskHandler {
149155

150156
for result in results {
151157
// And now do something with that result
158+
let result = result.map(|(a,b)| {
152159
let data_len = record_len * result.len();
153160
record_offset += data_len;
154161
result_index

vectorlink-search-collation/src/handler.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,10 @@ impl TaskHandler for CollationTaskHandler {
3535
async fn process(
3636
mut _live: TaskLiveness<Self::Init, Self::Progress>,
3737
) -> Result<Self::Complete, Self::Error> {
38+
// Read file from EFS
39+
// Load fragment of vectors in addition to index into memory
40+
// perform matrix product
41+
3842
todo!();
3943
}
4044
}

0 commit comments

Comments
 (0)