Changes

Jump to navigation Jump to search
Line 49: Line 49:  
A short example for sending lsl streaming data:
 
A short example for sending lsl streaming data:
 
<syntaxhighlight lang="python" line>
 
<syntaxhighlight lang="python" line>
#!/usr/bin/env python
+
#!/usr/bin/env python3.10
 +
# -*- coding: utf-8 -*-
   −
import threading
+
import time
from pylsl import StreamInfo, StreamOutlet
+
import numpy as np
 +
import keyboard
 +
import random
 +
from pylsl import StreamInfo, StreamOutlet, local_clock
 +
from LSLsettings import MarkerStreamSettings, DynamicMultiChannelStreamSettings
   −
def getData():
+
print(f"Setting up streams...")
    while running:
+
marker_settings = MarkerStreamSettings()
        buffer_in = getSensorData()
  −
        send_data = True
  −
        time.sleep(0.001)
     −
info = StreamInfo(
+
# info marker StreamInfo, outlet
     name='MyStream',  
+
marker_info = StreamInfo(
     type='COP',  
+
     name=marker_settings.stream_name,
     channel_count=4,  
+
     type=marker_settings.stream_type,
     nominal_srate = 200,  
+
     channel_count=marker_settings.channel_count,
     source_id='BalanceBoard_stream'
+
     nominal_srate=marker_settings.sample_rate,
    )
+
    channel_format=marker_settings.channel_format,
outlet = StreamOutlet(info)  
+
     source_id=marker_settings.source_id
 +
)
 +
marker_outlet = StreamOutlet(marker_info, chunk_size=marker_settings.push_chunk_size)
   −
threading.Thread(target=getData).start()
+
data_settings = DynamicMultiChannelStreamSettings(
while running == True:
+
    n_channels=4,
if send_data:
+
    stream_name="TestStream",
outlet.push_chunk(buffer_in)
+
    stream_type="TestData",
send_data = False
+
    sample_rate=100,
</syntaxhighlight>
+
    channel_names=["A", "B", "C", "D"]
 +
)
   −
A short example for receiving lsl data:
+
# info data StreamInfo, outlet
<syntaxhighlight lang="python" line>
+
data_info = StreamInfo(
#!/usr/bin/env python
+
    name=data_settings.stream_name,
 +
    type=data_settings.stream_type,
 +
    channel_count=data_settings.channel_count,
 +
    nominal_srate=data_settings.sample_rate,
 +
    channel_format=data_settings.channel_format,
 +
    source_id=data_settings.source_id
 +
)
 +
data_outlet = StreamOutlet(data_info, chunk_size=data_settings.push_chunk_size)
   −
from pylsl import StreamInlet, resolve_stream
+
running = True
 +
marker_outlet.push_sample(["Start"])
 +
buffer_dtype = object if data_settings.channel_format == "string" else float
 +
buffer_in = np.array(data_settings.default_buffer, dtype=buffer_dtype)
   −
streams = resolve_stream('name', 'MyStream')
+
print("Start streaming... Press 'q' to stop.")
#streams = resolve_streams()
+
 
 +
while not(keyboard.is_pressed('q')):
 +
 
 +
    random_numbers = [random.randint(0, 1) for _ in range(4)]
 +
    random_numbers = [num + 1 for num in random_numbers]
 +
    data_outlet.push_sample(random_numbers)
 +
 
 +
    time.sleep(data_settings.push_interval)
 +
 
 +
marker_outlet.push_sample(["Stop"])
   −
inlet = StreamInlet(streams[0])
  −
while True:
  −
    sample, timestamp = inlet.pull_sample()
  −
    print(timestamp, sample)
   
</syntaxhighlight>
 
</syntaxhighlight>
  

Navigation menu