| Line 53: |
Line 53: |
| | | | |
| | import time | | import time |
| − | import numpy as np | + | import random |
| | import keyboard | | import keyboard |
| − | import random
| + | from pylsl import StreamInfo, StreamOutlet |
| − | from pylsl import StreamInfo, StreamOutlet, local_clock | |
| − | from LSLsettings import MarkerStreamSettings, DynamicMultiChannelStreamSettings
| |
| | | | |
| − | print(f"Setting up streams...")
| + | from pylsl import local_clock |
| − | marker_settings = MarkerStreamSettings()
| |
| | | | |
| − | # info marker StreamInfo, outlet | + | # --- Setup Marker Stream --- |
| | + | print("Setting up marker stream...") |
| | marker_info = StreamInfo( | | marker_info = StreamInfo( |
| − | name=marker_settings.stream_name, | + | name="MarkerStream", |
| − | type=marker_settings.stream_type, | + | type="Markers", |
| − | channel_count=marker_settings.channel_count, | + | channel_count=1, |
| − | nominal_srate=marker_settings.sample_rate, | + | nominal_srate=0, # Irregular sampling |
| − | channel_format=marker_settings.channel_format, | + | channel_format="string", |
| − | source_id=marker_settings.source_id | + | source_id="MarkerSource" |
| | ) | | ) |
| − | marker_outlet = StreamOutlet(marker_info, chunk_size=marker_settings.push_chunk_size) | + | marker_outlet = StreamOutlet(marker_info, chunk_size=1) |
| | | | |
| − | data_settings = DynamicMultiChannelStreamSettings(
| + | # --- Setup Data Stream --- |
| − | n_channels=4,
| + | print("Setting up data stream...") |
| − | stream_name="TestStream", | + | data_info = StreamInfo( |
| − | stream_type="TestData", | + | name="TestStream", |
| − | sample_rate=100, | + | type="TestData", |
| − | channel_names=["A", "B", "C", "D"] | + | channel_count=4, |
| | + | nominal_srate=100, |
| | + | channel_format="float32", |
| | + | source_id="TestStream_Source" |
| | ) | | ) |
| | | | |
| − | # info data StreamInfo, outlet
| + | channels = data_info.desc().append_child("channels") |
| − | data_info = StreamInfo( | + | for name in ["A", "B", "C", "D"]: |
| − | name=data_settings.stream_name,
| + | chan = channels.append_child("channel") |
| − | type=data_settings.stream_type,
| + | chan.append_child_value("name", name) |
| − | channel_count=data_settings.channel_count, | + | chan.append_child_value("unit", "unitless") |
| − | nominal_srate=data_settings.sample_rate, | + | chan.append_child_value("type", "TestData") |
| − | channel_format=data_settings.channel_format, | |
| − | source_id=data_settings.source_id | |
| − | )
| |
| − | data_outlet = StreamOutlet(data_info, chunk_size=data_settings.push_chunk_size)
| |
| | | | |
| − | running = True
| + | data_outlet = StreamOutlet(data_info, chunk_size=1) |
| − | marker_outlet.push_sample(["Start"])
| |
| − | buffer_dtype = object if data_settings.channel_format == "string" else float
| |
| − | buffer_in = np.array(data_settings.default_buffer, dtype=buffer_dtype)
| |
| | | | |
| | + | # --- Start Streaming --- |
| | print("Start streaming... Press 'q' to stop.") | | print("Start streaming... Press 'q' to stop.") |
| | + | marker_outlet.push_sample(["Start"]) |
| | | | |
| − | while not(keyboard.is_pressed('q')): | + | while not keyboard.is_pressed('q'): |
| − | | + | random_numbers = [random.randint(1, 2) for _ in range(4)] |
| − | random_numbers = [random.randint(0, 1) for _ in range(4)] | |
| − | random_numbers = [num + 1 for num in random_numbers]
| |
| | data_outlet.push_sample(random_numbers) | | data_outlet.push_sample(random_numbers) |
| − | | + | time.sleep(0.01) |
| − | time.sleep(data_settings.push_interval) | |
| | | | |
| | marker_outlet.push_sample(["Stop"]) | | marker_outlet.push_sample(["Stop"]) |
| − | | + | print("Streaming stopped.") |
| − | </syntaxhighlight>
| |
| − | | |
| − | the settings.py file for LSL:
| |
| − | <syntaxhighlight lang="python" line>
| |
| − | # settings.py
| |
| − | | |
| − | class BaseSettings:
| |
| − | """Basisklasse met standaardinstellingen voor een LSL stream."""
| |
| − | def __init__(self):
| |
| − | self.stream_name = "BaseStream"
| |
| − | self.stream_type = "Unknown"
| |
| − | self.channel_count = 1
| |
| − | self.sample_rate = 0
| |
| − | self.channel_format = "float32"
| |
| − | self.source_id = "BaseSource"
| |
| − | | |
| − | self.channel_names = []
| |
| − | self.push_chunk_size = 1
| |
| − | self.push_interval = 0.001
| |
| − | | |
| − | self.use_random_data = False
| |
| − | self.dummy_string = "Test"
| |
| − | self.random_string_options = ["A", "B", "C"]
| |
| − | | |
| − | self.verbose = True
| |
| − | self.quit_method = "psychopy"
| |
| − | self.user_stop_message = "Press ENTER/RETURN to stop acquisition."
| |
| − | | |
| − | self.default_buffer = ["Test"]
| |
| − | | |
| − | # the scope is not implemented yet
| |
| − | self.scope = "lan"
| |
| − | | |
| − | def get_stream_options(self):
| |
| − | if self.scope in ["local", "lan", "internet"]:
| |
| − | return dict()
| |
| − | else:
| |
| − | raise ValueError(f"Unknown scope: {self.scope}")
| |
| − | | |
| − | | |
| − | class DynamicMultiChannelStreamSettings(BaseSettings):
| |
| − | def __init__(
| |
| − | self,
| |
| − | n_channels=8,
| |
| − | stream_name="DynamicStream",
| |
| − | stream_type="EEG",
| |
| − | sample_rate=256,
| |
| − | channel_format="float32",
| |
| − | channel_names=None
| |
| − | ):
| |
| − | super().__init__()
| |
| − | self.stream_name = stream_name
| |
| − | self.stream_type = stream_type
| |
| − | self.channel_count = n_channels
| |
| − | self.sample_rate = sample_rate
| |
| − | self.channel_format = channel_format
| |
| − | self.source_id = f"{stream_name}_Source"
| |
| − | | |
| − | if channel_names is not None:
| |
| − | if len(channel_names) != n_channels:
| |
| − | raise ValueError(f"Number of channel names ({len(channel_names)}) does not match n_channels ({n_channels})")
| |
| − | self.channel_names = channel_names
| |
| − | else:
| |
| − | self.channel_names = [f"Chan{i+1}" for i in range(n_channels)]
| |
| − | | |
| − | self.default_buffer = [0.0 for _ in range(n_channels)]
| |
| − | | |
| − | class MarkerStreamSettings(BaseSettings):
| |
| − | def __init__(self):
| |
| − | super().__init__()
| |
| − | self.stream_name = "MarkerStream"
| |
| − | self.stream_type = "Markers"
| |
| − | self.channel_count = 1
| |
| − | self.sample_rate = 0 # 0 Hz = irregular sampling
| |
| − | self.channel_format = "string"
| |
| − | self.source_id = "MarkerSource"
| |
| − | | |
| − | self.channel_names = [] # niet nodig voor markers
| |
| − | self.default_buffer = ["TestMarker"]
| |
| | </syntaxhighlight> | | </syntaxhighlight> |
| | | | |