Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions requirements/dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,7 @@ typing-inspection==0.4.2 \
# via
# -c requirements/main.txt
# pydantic
virtualenv==20.35.3 \
--hash=sha256:4f1a845d131133bdff10590489610c98c168ff99dc75d6c96853801f7f67af44 \
--hash=sha256:63d106565078d8c8d0b206d48080f938a8b25361e19432d2c9db40d2899c810a
virtualenv==20.35.4 \
--hash=sha256:643d3914d73d3eeb0c552cbb12d7e82adf0e504dbf86a3182f8771a153a1971c \
--hash=sha256:c21c9cede36c9753eeade68ba7d523529f228a403463376cf821eaae2b650f1b
# via pre-commit
223 changes: 125 additions & 98 deletions requirements/main.txt

Large diffs are not rendered by default.

14 changes: 5 additions & 9 deletions src/obsloctap/consumekafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,8 @@ def convert_predicted_kafka(msg: dict) -> list[Obsplan]:
plan.append(p)
if len(plan) < max:
log.info(
"Predicted message says {max} targets but only "
"{len(plan)} have t_planning>0"
f"Predicted message says {max} targets but only "
f"{len(plan)} have t_planning>0"
)
return plan

Expand All @@ -127,7 +127,6 @@ def unpack_value(value: Any, schema: dict) -> dict:


def unpack_message(msg: ConsumerRecord) -> dict:
log.info(f"Unpack kafka message {msg.timestamp}")
value = msg.value # bytes from Kafka message
magic = value[0]
assert magic == 0, "Not Confluent Avro wire format"
Expand All @@ -137,14 +136,11 @@ def unpack_message(msg: ConsumerRecord) -> dict:


async def process_message(msg: ConsumerRecord) -> None:
log.info(
f"Processing kafka - {msg['topic']} {msg['timestamp']} "
f"index: {[msg['salIndex']]}"
)
if msg["salIndex"] != config.salIndex:
log.info(f"Processing kafka - {msg.key} {msg.timestamp} ")
record = unpack_message(msg)
if record["salIndex"] != config.salIndex:
log.info(f"Skipping message - salIndex not {config.salIndex}")
return
record = unpack_message(msg)
# log.debug(record)
plan = convert_predicted_kafka(record)
log.info(
Expand Down
11 changes: 9 additions & 2 deletions src/obsloctap/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,11 @@ async def update_entries(
async def insert_exposure(
self, exp: Exposure, session: AsyncSession
) -> None:
if not exp.band:
log.warning(
f"{exp.exposure_id} has no band - "
f"will insert 'other:pinhole'" # noqa: E231 em_max
)
"""put in an obsplan line based on an exposure
this is when consdb has an observation but it does not match
any planned item"""
Expand All @@ -248,8 +253,10 @@ async def insert_exposure(
f"{exp.obs_end_mjd}, " # t_max
f"{exp.obs_end_mjd - exp.obs_start_mjd}, " # t_exptime
f"15, " # t_resolution
f"'{spectral_ranges[exp.band][0]}', " # em_min
f"'{spectral_ranges[exp.band][1]}', " # em_max
f"'{spectral_ranges[
exp.band or 'other:pinhole'][0]}', " # noqa: E231 em_min
f"'{spectral_ranges[
exp.band or 'other:pinhole'][1]}', " # noqa: E231 em_max
f"0, " # em_res_power
f"'phot.flux.density', " # o_ucd
f"'', " # pol_states
Expand Down
7 changes: 6 additions & 1 deletion src/obsloctap/schedule24h.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,10 +48,11 @@ def __init__(self) -> None:

def get_schedule24(self) -> DataFrame:
"""
Get he 24 schedule form the rubin_sim api call
Get the 24 schedule form the rubin_sim api call
Need these:
os.environ["LSST_DISABLE_BUCKET_VALIDATION"] = "1"
os.environ["S3_ENDPOINT_URL"] = "https://s3dfrgw.slac.stanford.edu/"
AWS credintials
Returns DataFrame of schedule entries
-------
"""
Expand All @@ -62,6 +63,10 @@ def get_schedule24(self) -> DataFrame:
except TypeError:
log.info("Dropping to 1 night for sim")
visits = sim_archive.fetch_obsloctap_visits(nights=1)
except ValueError as ve:
log.warning(f"Error encountered while fetching visits {ve}")
return []

if type(visits) is not DataFrame:
visits = DataFrame(visits)
log.info(f"Got {visits.size} for 24 hour schedule")
Expand Down
5 changes: 4 additions & 1 deletion tests/DbTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,5 +201,8 @@ async def test_insert_exposure(self) -> None:
exp = exps[0]
await dbhelp.insert_exposure(exp, dbhelp.get_session())

exp = exps[1]
exp.band = None # test this
await dbhelp.insert_exposure(exp, dbhelp.get_session())
plans2 = len(await dbhelp.get_schedule(time=0))
assert plans2 == plans + 1
assert plans2 == plans + 2
10 changes: 8 additions & 2 deletions tests/PredictedScheduleTest.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,16 @@ def test_convert(self) -> None:

def test_msg(self) -> None:
# schema = get_schema()
with open("tests/predicted_message.pkl", "rb") as f:
with open("tests/message_mt.pkl", "rb") as f:
msg = pickle.load(f)
assert msg
# plan = unpack_message(msg, schema)
with open("tests/schema2191.pkl", "rb") as s:
schema = pickle.load(s)
print(f"testing kafka - {msg.timestamp} " f"index: {msg.key}")
rec = unpack_value(msg.value, schema)
self.assertEqual(rec["salIndex"], config.salIndex)
plan = convert_predicted_kafka(rec)
self.assertEqual(19, len(plan), " Seem to not get all lines")

@pytest.mark.asyncio
async def test_exp_updates(self) -> None:
Expand Down
Binary file added tests/message_mt.pkl
Binary file not shown.
7 changes: 4 additions & 3 deletions tests/testutils.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,18 +29,19 @@ async def dump_msg() -> None:
schema = pickle.load(s)
try:
schema = get_schema(schema_id)
except Exception:
print(f"failed to get schems {Exception}")
except Exception as e:
print(f"failed to get schemas {e}")

msg_dict = unpack_value(value, schema)
if msg_dict["salIndex"] == 1:
with open(f"schema{schema_id}.pkl", "wb") as f:
pickle.dump(schema, f)
with open(msg_fn, "wb") as f:
print(f"Dumping message {msg_fn}")
pickle.dump(msg, f)
break
else:
print(f"Ignoring message {msg_dict['salIndex']}")
print(f"Ignoring message salIndex {msg_dict['salIndex']}")
await consumer.stop()


Expand Down
Loading