LeetCode: Heaps

Table Of Content
Heaps intro
- What is a Heap
- Heap Characteristics
- Heap Representation
- Heap IRL
- Heap Application: Top K Element Extraction
- Heap Application: Dual Heap Balancing Representing Secondary Property
- Heap Application: K Way Merge for Sorted Streams
- Heap Application: Best First Search or Breadth First Search with Priority
- Heap Application: Kth element Within Iterating Sliding Window
- Heap Application: Interval Scheduling Optimization
621. Task Scheduler ::3:: - Medium
- Intro
- Abstraction
- Space & Time Complexity
- Brute Force:
- Find the Bug:
- Solution 1: Highest Occurring Tasks into Greedy Math Formula - Heap/Heap
- Solution 2: Max Heap Tick by Tick Simulation For Count Occurrences - Heap/Heap
- Solution 3: Max Heap Time Jump Optimization Simulation for Count Occurrences - Heap/Heap
Heaps intro
LeetCode problems with heap solutions.
What is a Heap
Heaps are specialized complete binary tree structures used to prioritize data.
A heap maintains a partial order property:
- MinHeap: Parent nodes are less than or equal to their children (smallest element at root)
- MaxHeap: Parent nodes are greater than or equal to their children (largest element at root)
The tree is complete: all levels are fully filled except possible the last, which is filled left to right
Peek(): efficient access to min/max element in O(1) time Insert()/Pop(): Heap edit in O(log n) time
Heap Characteristics
Heaps are characterized by:
- Nodes: Contain values arranged by heap property
- Complete Tree: Shape property ensures tree is always balanced
- Heap Property: Parent value ≤ (minHeap) or Parent value ≥ children values
- Nothing beyond heap property: No ordering between siblings or subtrees maintained, other than parent child from heap property
- Array: Implemented as array for space and cache efficiency
Heap Representation
Tree heap represented as an array:
Array Form
Index: 0 1 2 3 4 5 6
Value: [3, 5, 8, 9, 10, 12, 15]
Tree Form
(0)3
/ \
(1)5 (2)8
/ \ / \
(3)9 (4)10 (5)12 (6)15
Heap IRL
Priority Queues: scheduling, bandwidth management Graph algorithms: Dijkstra's shortest path, Prim's MST Event simulation: process events by priority/time
Heap Application: Top K Element Extraction
We can maintain a heap to quickly retrieve the top K largest or smallest elements without fully sorting the input.
Ex: Find the kth largest element in array
def kthLargest(nums, k):
heap = []
for num in nums:
heapq.heappush(heap, num)
if len(heap) > k:
heapq.heappop(heap) # Keep only top K elements
return heap[0]
# Example: kthLargest([3, 2, 1, 5, 6, 4], 2) -> 5
Heap Application: Dual Heap Balancing Representing Secondary Property
We can use two heaps, maxHeap for lower half of values and a minHeap for the upper half, to maintain a property while adding and removing elements. Pattern supports quick lookups of medians, balance points, or range constraints.
Ex: Maintain median of stream.
class MedianFinder:
def __init__(self):
self.low = [] # Max-heap (invert values)
self.high = [] # Min-heap
def addNum(self, num: int) -> None:
heapq.heappush(self.low, -num)
heapq.heappush(self.high, -heapq.heappop(self.low))
if len(self.low) < len(self.high):
heapq.heappush(self.low, -heapq.heappop(self.high))
def findMedian(self) -> float:
if len(self.low) > len(self.high):
return -self.low[0]
return (-self.low[0] + self.high[0]) / 2
Heap Application: K Way Merge for Sorted Streams
A minHeap can efficiently merge multiple sorted lists or streams by always extracting the next smallest element across all inputs.
Ex: Merge k sorted lists into one sorted lists
def mergeKSorted(lists):
heap = []
for i, lst in enumerate(lists):
if lst:
heapq.heappush(heap, (lst[0], i, 0))
result = []
while heap:
val, list_idx, elem_idx = heapq.heappop(heap)
result.append(val)
if elem_idx + 1 < len(lists[list_idx]):
heapq.heappush(heap, (lists[list_idx][elem_idx+1], list_idx, elem_idx+1))
return result
# mergeKSorted([[1,4,5],[1,3,4],[2,6]]) -> [1,1,2,3,4,4,5,6]
Heap Application: Best First Search or Breadth First Search with Priority
A heap can drive a search algorithm where you can expand the 'best' candidate first: A*, Dijkstra's, Prims, etc. In a non negative weighted graph context: a minHeap can be used to expand on the closest node first in a This always ensures that the next node popped from the heap has the smallest known distance from the source.
Ex: Dijkstra's Algorithm MinHeap
def dijkstra(graph, start):
# Graph: Node -> list of (neighbor, weight) tuples
# start: Starting node for shortest path search
# Result:
# dist: shortest distance from start to all other nodes
# Init all distances to infinity,
# except start node which has distance 0
dist = {node: float('inf') for node in graph}
dist[start] = 0
# minHeap priority queue stores tuples of (distance, node)
# iterate from start node and commence exploring neighbors
heap = [(0, start)]
# while there are nodes to process in heap
while heap:
# grab node with smallest known distance from start
d, node = heapq.heappop(heap)
# Skip if we already have already explored a shorter
# path to the current node
if d > dist[node]:
continue
# Check each neighbor and see if we have found
# a shorter path through the current node
for nei, w in graph[node]:
# grab new distance
nd = d + w
# if shorter path found, update and push to heap
if nd < dist[nei]:
dist[nei] = nd
heapq.heappush(heap, (nd, nei))
# overall: time complexity
# overall: space complexity
return dist
Heap Application: Kth element Within Iterating Sliding Window
A heap can track max/min within a sliding window efficiently when combined with lazy deletion or index tracking.
Ex: Find maximum in each sliding windows
def max_sliding_window(nums, k):
# maxHeap via minHeap with negative values
# initialize with first k elements
heap = [(-nums[i], i) for i in range(k)]
heapq.heapify(heap)
# root of heap is the largest in the current window
res = [-heap[0][0]]
# iterate sliding window forward
for i in range(k, len(nums)):
# push new element to heap
heapq.heappush(heap, (-nums[i], i))
# remove top element if they are outside current window
while heap[0][1] <= i - k:
heapq.heappop(heap)
# after removing out of window elements, heap root is max
# within the current window
res.append(-heap[0][0])
return res
Heap Application: Interval Scheduling Optimization
A minHeap can track the earliest finishing times among active intervals, enabling optimal scheduling of jobs or meetings without conflicts.
Ex: Find the minimum number of meeting rooms required
def minMeetingRooms(intervals):
# Sort intervals by their start time,
# ensures we process meetings in chronological order
intervals.sort(key=lambda x: x[0])
# minHeap to track end time of ongoing meetings,
# storing the earliest ending meeting at the top
heap = []
# iterate over rooms in earliest starting order
for start, end in intervals:
# if heap is not empty, and the earliest ending meeting the earliest meeting ends
# ends before or exactly when the new meeting starts,
# then we can remove the earliest ending meeting
# (essentially freeing up a room)
if heap and heap[0] <= start:
# pop meeting that ended the earliest
heapq.heappop(heap)
# 'start' the current meeting by pushing it to the heap
heapq.heappush(heap, end)
# last heap size is the number meetings running
# concurrently after all meetings have started
return len(heap)
# Example:
# minMeetingRooms([[0,30],[5,10],[15,20]]) -> 2
703. Kth Largest Element in a Stream ::1:: - Easy
Topics: Tree, Design, Binary Search Tree, Heap (Priority Queue), Binary Tree, Data Stream
Intro
You are part of a university admissions office and need to keep
track of the kth highest test score from applicants in real-time. This helps to determine cut-off marks for interviews and admissions dynamically as new applicants submit their scores. You are tasked to implement a class which, for a given integer k, maintains a stream of test scores and continuously returns the kth highest test score after a new score has been submitted. More specifically, we are looking for the kth highest score in the sorted list of all scores. Implement the KthLargest class: KthLargest(int k, int[] nums) Initializes the object with the integer k and the stream of test scores nums. int add(int val) Adds a new test score val to the stream and returns the element representing the kth largest element in the pool of test scores so far.
Example Input | Output |
---|---|
["KthLargest", "add", "add", "add", "add", "add"] [[3, [4, 5, 8, 2]], [3], [5], [10], [9], [4]] | [null, 4, 5, 5, 8, 8] |
["KthLargest", "add", "add", "add", "add"] [[4, [7, 7, 7, 7, 8, 3]], [2], [10], [9], [9]] | [null, 7, 7, 7, 8] |
Constraints:
0 ≤ nums.length ≤ 104
1 ≤ k ≤ nums.length + 1
-104 ≤ nums[i] ≤ 104
-104 ≤ val ≤ 104
At most 104 calls will be make to add.
Abstraction
Implement a heap that will keep track of the kth highest score for a data stream.
Space & Time Complexity
Solution | Time Complexity | Space Complexity | Time Remark | Space Remark |
---|---|---|---|---|
Bug | Error |
---|---|
Brute Force:
Aspect | Time Complexity | Space Complexity | Time Remarks | Space Remarks |
---|---|---|---|---|
Find the Bug:
Solution 1: Min Heap of Size k - Heap/Heap
class KthLargest:
def __init__(self, k: int, nums: list[int]):
# Note:
# 1. MinHeap of size k containing the k largest elements seen so far.
# Result: Top k scores saved, with lowest top score at root
# size of minHeap
self.k = k
# minHeap
self.minHeap = []
# stream input data
for num in nums:
self.add(num)
# time complexity:
# space complexity:
def add(self, val: int) -> int:
# Note:
# 1. if minHeap is less than capacity, simply push
# 2. if minHeap is out of space, push only if new score beats lowest score (root)
# heap still has space, simply push
if len(self.minHeap) < self.k:
heapq.heappush(self.minHeap, val)
# heap is out of space, check if valid add
else:
# push if new score beats lowest high score (root)
smallestTopScore = self.minHeap[0]
if val > smallestTopScore:
heapq.heapreplace(self.minHeap, val)
# return top score
return self.minHeap[0]
# overall: time complexity
# overall: space complexity
1046. Last Stone Weight ::1:: - Easy
Topics: Array, Heap (Priority Queue)
Intro
You are given an array of integers stones where stones[i] is the weight of the ith stone. We are playing a game with the stones. On each turn, we choose the heaviest two stones and smash them together. Suppose the heaviest two stones have weights x and y with x ≤ y. The result of this smash is: If x == y, both stones are destroyed, and If x != y, the stone of weight x is destroyed, and the stone of weight y has new weight y - x. At the end of the game, there is at most one stone left. Return the weight of the last remaining stone. If there are no stones left, return 0.
Example Input | Output |
---|---|
stones = [2,7,4,1,8,1] | 1 |
stones = [1] | 1 |
Constraints:
1 ≤ stones.length ≤ 30
1 ≤ stones[i] ≤ 1000
Abstraction
Stone smashing game. Given some amount of stones, a round consists of smashing the two heaviest stones together. If they are the same weight both stones are destroyed. If one is heavier than the other, the heavier stone loses weight equal to the weight of the less heavy stone.
Space & Time Complexity
Solution | Time Complexity | Space Complexity | Time Remark | Space Remark |
---|---|---|---|---|
Bug | Error |
---|---|
Brute Force:
Aspect | Time Complexity | Space Complexity | Time Remarks | Space Remarks |
---|---|---|---|---|
Find the Bug:
Solution 1: Max Heap - Heap/Heap
def lastStoneWeight(self, stones: list[int]) -> int:
# Note:
# 1. MaxHeap via MinHeap with negative values
# 2. Continue smashing pairs of stones until heap size 1
# Result: final stone weight or all stones destroyed
# turn list to negative numbers
maxHeap = [-stone for stone in stones]
# transform list into heap, in place, in linear time
heapq.heapify(maxHeap)
# remove top two stones until one stone left
while len(maxHeap) > 1:
# grab two heaviest stones
first = -(heapq.heappop(maxHeap))
second = -(heapq.heappop(maxHeap))
# push if partial stone remains
if first != second:
remainingStoneWeight = -(first - second)
heapq.heappush(maxHeap, remainingStoneWeight)
# check if stone remains
res = -maxHeap[0] if maxHeap else 0
# overall: time complexity
# overall: space complexity
return res
973. K Closest Points to Origin ::2:: - Medium
Topics: Array, Math, Divide and Conquer, Geometry, Sorting, Heap (Priority Queue), Quickselect
Intro
Given an array of points where points[i] = [x_i, y_i] represents a point on the X-Y plane and an integer k, return the k closest points to the origin (0, 0). The distance between two points on the X-Y plane is the Euclidean distance: sqrt((x_1, x-2)^2 + (y_1 + y_2)^2) You may return the answer in any order. The answer is guaranteed to be unique (except for the order that it is in).
Example Input | Output |
---|---|
points = [[1,3],[-2,2]], k = 1 | [[-2, 2]] |
points = [[3,3],[5,-1],[-2,4]], k = 2 | [[3,3],[-2,4]] |
Constraints:
1 ≤ k ≤ points.length ≤ 104
-104 ≤ x_1, y_i ≤ 104
Abstraction
Given a list of point on a X-Y grid, find the k closest points to the origin.
Space & Time Complexity
Solution | Time Complexity | Space Complexity | Time Remark | Space Remark |
---|---|---|---|---|
Bug | Error |
---|---|
Brute Force:
Aspect | Time Complexity | Space Complexity | Time Remarks | Space Remarks |
---|---|---|---|---|
Find the Bug:
Solution 1: Max Heap of size k - Heap/Heap
def kClosest(self, points: list[list[int]], k: int) -> list[list[int]]:
# Note:
# 1. We want the k points with the smallest Euclidean distances from (0, 0).
# 2. Instead of computing sqrt(x^2 + y^2), we can use the squared distance x^2 + y^2
# because sqrt is monotonic and doesn't change order.
# 3. Approach:
# - Use a max-heap (store negative distances) of size k.
# - Iterate over points:
# if heap size < k: push (-distance, point)
# else if current distance < largest distance in heap: pop and push
# - This ensures we always keep k closest points in O(n log k) time.
# (-distance, [x, y])
maxHeap = []
# add all distances to maxHeap
for x, y in points:
# square distance
dist = x * x + y * y
# if maxHeap has space, simply push
if len(maxHeap) < k:
heapq.heappush(maxHeap, (-dist, [x, y]))
# if maxHeap is full, check if push is required
else:
# if closer than farthest point
if dist < -maxHeap[0][0]:
heapq.heapreplace(maxHeap, (-dist, [x, y]))
# grab list of coords
res = [xy for (_, xy) in maxHeap]
# overall: time complexity
# overall: space complexity
return res
Solution 2: QuickSelect - Heap/Heap
def kClosest(self, points: list[list[int]], k: int) -> list[list[int]]:
# Note:
# Classic Quick Select but with square distance
# sqrt() is monotonic so we can ignore it
# squared distance to origin
def dist(i: int) -> int:
return points[i][0] ** 2 + points[i][1] ** 2
def partition(left, right, randPivotIndex) -> int:
# prepare partition
randPivotDist = dist(randPivotIndex)
points[randPivotIndex], points[right] = points[right], points[randPivotIndex]
leftPartition = left
# partition all closer points to left
for i in range(left, right):
# if point is closer to origin (lower)
if dist(i) < randPivotDist:
points[leftPartition], points[i] = points[i], points[leftPartition]
leftPartition += 1
# restore pivot
points[right], points[leftPartition] = points[leftPartition], points[right]
# return
return leftPartition
def quickselect(left, right, finalIndex):
# Base case:
if left >= right:
return
# random pivot and result
randPivotIndex = random.randint(left, right)
resPivot = partition(left, right, randPivotIndex)
# target check
if resPivot == finalIndex:
return
elif resPivot < finalIndex:
quickselect(resPivot + 1, right, finalIndex)
else:
quickselect(left, resPivot - 1, finalIndex)
# boundary
n = len(points)
# quickSelect on bounds
quickselect(0, n - 1, k)
# grab lowest k points
res = points[:k]
# overall: time complexity
# overall: space complexity
return res
215. Kth Largest Element in an Array ::2:: - Medium
Topics: Array, Math, Divide and Conquer, Geometry, Sorting, Heap (Priority Queue), Quickselect
Intro
Given an integer array nums and an integer k, return the kth largest element in the array. Note that it is the kth largest element in the sorted order, not the kth distinct element. Can you solve it without sorting?
Example Input | Output |
---|---|
nums = [3,2,1,5,6,4], k = 2 | 5 |
nums = [3,2,3,1,2,4,5,5,6], k = 4 | 4 |
Constraints:
1 ≤ k ≤ nums.length ≤ 105
-104 ≤ nums[i] ≤ 104
Abstraction
Given an unsorted array, find the kth largest element in sorted order.
Space & Time Complexity
Solution | Time Complexity | Space Complexity | Time Remark | Space Remark |
---|---|---|---|---|
Bug | Error |
---|---|
Brute Force:
Aspect | Time Complexity | Space Complexity | Time Remarks | Space Remarks |
---|---|---|---|---|
Find the Bug:
Solution 1: Min Heap of Size k - Heap/Heap
def findKthLargest(self, nums: list[int], k: int) -> int:
# Note:
# 1. MinHeap of size k
# Result: root of minHeap contains kth largest element
# heap
minHeap = []
# for each element
for num in nums:
# if heap has space, simply push
if len(minHeap) < k:
heapq.heappush(minHeap, num)
# if heap has no space, check if push is necessary
else:
# push if new num is larger than smallest in minHeap,
# this will force a new kth largest element
if num > minHeap[0]:
heapq.heapreplace(minHeap, num)
# kth largest is at the root of the minHeap
kth = minHeap[0]
# overall: time complexity
# overall: space complexity
return kth
Solution 2: Modified Ignore Duplicates Quick Select - Heap/Heap
def findKthLargest(self, nums: List[int], k: int) -> int:
# Note:
# Optimized Quick Select which will remove 1 subList of duplicates per iteration
# 1. RandPivot
# 2. Partition nums by creating larger, equal, or smaller than pivot subLists
# 3. Recurse into the correct subList
# Result: kth largest found while accounting for potential stream of duplicates
# Empty check
if not nums:
return
# random pivot
pivot = random.choice(nums)
# Partition nums into three parts:
# bigger: elements greater than pivot
bigger = [num for num in nums if num > pivot]
# if kth is within bigger, recurse
if k <= len(bigger):
return self.findKthLargest(bigger, k)
# Duplicate Optimization
# equal: elements equal to pivot
equal = [num for num in nums if num == pivot]
# If kth largest is within biggest + equal count,
# kth will be within partition duplicate list
if k <= len(bigger) + len(equal):
return equal[0]
# kth must be within smaller list, recurse
# update k to ignore length of bigger and equal list
smaller = [num for num in nums if num < pivot]
return self.findKthLargest(smaller, k-len(bigger)-len(equal))
621. Task Scheduler ::3:: - Medium
Topics: Array, Hash Table, Greedy, Sorting, Heap (Priority Queue), Counting, Computer Architecture Scheduling
Intro
You are given an array of CPU tasks, each labeled with a letter from A to Z, and a number n. Each CPU interval can be idle or allow the completion of one task. Tasks can be completed in any order, but there's a constraint: there has to be a gap of at least n intervals between two tasks with the same label. Return the minimum number of CPU intervals required to complete all tasks.
Example Input | Output |
---|---|
tasks = ["A","A","A","B","B","B"], n = 2 | 8 |
["A","C","A","B","D","B"], n = 1 | 6 |
tasks = ["A","A","A", "B","B","B"], n = 3 | 10 |
Constraints:
1 ≤ tasks.length ≤ 105
tasks[i] is an uppercase English letter
0 ≤ n ≤ 100
Abstraction
Given a list of tasks marked by uppercase English letters and a minimum distance n between identical tasks, find how many cycles it would take to finish all tasks.
Space & Time Complexity
Solution | Time Complexity | Space Complexity | Time Remark | Space Remark |
---|---|---|---|---|
Bug | Error |
---|---|
Brute Force:
Aspect | Time Complexity | Space Complexity | Time Remarks | Space Remarks |
---|---|---|---|---|
Find the Bug:
Solution 1: Highest Occurring Tasks into Greedy Math Formula - Heap/Heap
def leastInterval(self, tasks: list[str], n: int) -> int:
# Note:
# 1. Highest occurring tasks + idle time determine the min scheduling
# 2. Total number of tasks determine the min scheduling
# Result: min scheduling for list of tasks
# Count frequency of each task (A-Z)
freq = defaultdict(int)
for task in tasks:
freq[task] += 1
# highest number of occurrences for single task
maxFreq = max(freq.values())
# number of tasks with highest number of occurrences
maxCount = sum(1 for count in freq.values() if count == maxFreq)
# The minimum time is based on arranging the most frequent tasks spaced by n:
# ex:
# A, B, C share freq count of 9
# idle interval n = 5
#
# so:
# [A -> B -> C -> idle -> idle...] * 8 times
# + [A -> B -> C] last group (notice no idles needed)
# (maxFreq-1) -> 8 times = groups
# (n+1) = length of group (n idle time + 1 for task occurrence
# + maxCount for last group
intervals = (maxFreq - 1) * (n + 1) + maxCount
# if totalTasks is higher than intervals, totalTasks is the lower bound
# ie: A,B,C combo takes 51
# however, rest of alphabet all has 4 count, leading to 119
# we will take the 119
# (in this case, we wont have to use idle)
res = max(intervals, len(tasks))
# overall: time complexity
# overall: space complexity
return res
Solution 2: Max Heap Tick by Tick Simulation For Count Occurrences - Heap/Heap
def leastInterval(self, tasks: list[str], n: int) -> int:
# Note:
# Simulates task scheduling process
# 1. MaxHeap picks current most frequent remaining task first
# 2. Queue (FIFO) simulates cool down to store tasks that have been run
# but must wait 'n' intervals before they can be scheduled again
# 3. Each iteration represents 1 unit of CPU time
# 4. Iterate until both MaxHeap and Queue are empty
# Result: time is total CPU time needed, including idle intervals
# counts
freq = Counter(tasks)
# MaxHeap with negative counts
maxHeap = [-count for count in freq.values()]
heapq.heapify(maxHeap)
# total CPU time simulated
time = 0
# Queue to hold cooldown tasks: (ready_time, count)
cooldown = deque()
# until both are empty
while maxHeap or cooldown:
# 1 CPU tick
time += 1
# grab most frequent task
if maxHeap:
# remove most freq task from root, and decrement by 1
cnt = heapq.heappop(maxHeap) + 1
if cnt != 0:
# put task into cool down,
# and calculate time until next available use
cooldown.append((time + n, cnt))
# check if any task in cool down is ready to be added back to maxHeap
if cooldown and cooldown[0][0] == time:
# pop FIFO
_, cnt = cooldown.popleft()
# push task occurrence count back to maxHeap
heapq.heappush(maxHeap, cnt)
# overall: time complexity
# overall: space complexity
return time
Solution 3: Max Heap Time Jump Optimization Simulation for Count Occurrences - Heap/Heap
def leastInterval(self, tasks: List[str], n: int) -> int:
# Note:
# Simulates task scheduling with a MaxHeap + cool down queue
# 1. MaxHeap picks current most frequent remaining task first
# 2. Queue (FIFO) simulates cool down to store tasks that have been run
# but must wait 'n' intervals before they can be scheduled again
# 3. if MaxHeap is empty but cool down queue is not, we 'jump'
# directly to the next ready task instead of simulating idle ticks,
#. skipping over unnecessary +1 increments and speeds up simulation
# Result: time is total CPU time needed, including idle intervals,
# with fewer iterations
# counts
freq = Counter(tasks)
# MaxHeap with negative counts
maxHeap = [-count for count in freq.values()]
heapq.heapify(maxHeap)
# total CPU time simulated
time = 0
# Queue to hold cooldown tasks: (-cnt, idleTime)
q = deque()
# until both are empty
while maxHeap or q:
# CPU tick
time += 1
# if maxHeap is empty
if not maxHeap:
# time 'jump' to next cool down
time = q[0][1]
# maxHeap is non empty
else:
# remove most freq task from root, and decrement by 1
cnt = 1 + heapq.heappop(maxHeap)
if cnt:
# put task into cool down,
# and calculate time until next available use
q.append([cnt, time + n])
# check if any task in cool down is ready to be added back to maxHeap
if q and q[0][1] == time:
# pop FIFO
(cnt, idleTime) = q.popleft()
heapq.heappush(maxHeap, cnt)
# overall: time complexity
# overall: space complexity
return time
355. Design Twitter ::2:: - Medium
Topics: Hash Table, Linked List, Design, Heap (Priority Queue)
Intro
Design a simplified version of Twitter where users can post tweets, follow/unfollow another user, and is able to see the 10 most recent tweets in the user's news feed. Implement the Twitter class: Twitter() Initializes your twitter object. void postTweet(int userId, int tweetId) Composes a new tweet with ID tweetId by the user userId. Each call to this function will be made with a unique tweetId. List[Integer] getNewsFeed(int userId) Retrieves the 10 most recent tweet IDs in the user's news feed. Each item in the news feed must be posted by users who the user followed or by the user themself. Tweets must be ordered from most recent to least recent. void follow(int followerId, int followeeId) The user with ID followerId started following the user with ID followeeId. void unfollow(int followerId, int followeeId) The user with ID followerId started unfollowing the user with ID followeeId.
Example Input | Output |
---|---|
["Twitter", "postTweet", "getNewsFeed", "follow", "postTweet", "getNewsFeed", "unfollow", "getNewsFeed"] [[], [1, 5], [1], [1, 2], [2, 6], [1], [1, 2], [1]] | [null, null, [5], null, null, [6, 5], null, [5]] |
Constraints:
1 ≤ userId, follwerId, followeeId ≤ 500
0 ≤ tweetId ≤ 104
All the tweets have unique IDs.
At most 3 * 104 calls will be made to postTweet, getNewsFeed, follow, and unfollow
A user cannot follow themselves.
Abstraction
Design a miniTwitter that allows for efficient news feed and postTweet. Boils down to desicions on efficiently storing tweets and retrieving the top 10 recent ones. Also boils down to a merge k sorted lists question.
Space & Time Complexity
Solution | Time Complexity | Space Complexity | Time Remark | Space Remark |
---|---|---|---|---|
Bug | Error |
---|---|
Brute Force:
Aspect | Time Complexity | Space Complexity | Time Remarks | Space Remarks |
---|---|---|---|---|
Find the Bug:
Solution 1: Store Tweets as Lists + Min Heap Merge - Heap/Heap
class Twitter:
def __init__(self):
# IRL Use Case:
# Balanced usage scenario users follow moderate number of accounts, tweet counts are not huge.
# Note:
# Tweets: Stores tweets as deques (lists) per user.
# News Feed: Merges k sorted lists using a maxHeap to always pick the most
# recent tweet across the user and their followees.
# Pros: Simple implementation, efficient for moderate tweet counts.
# Cons: Heap merge has some overhead; memory grows with many tweets per user.
self.followees = defaultdict(set)
self.tweets = defaultdict(deque)
self.time = 0
self.FEED_SIZE = 10
# time complexity:
# space complexity:
def postTweet(self, userId: int, tweetId: int) -> None:
# Note:
#
self.time += 1
self.tweets[userId].append((self.time, tweetId))
if len(self.tweets[userId]) > 100:
self.tweets[userId].popleft()
# time complexity:
# space complexity:
def follow(self, followerId: int, followeeId: int) -> None:
# Note:
#
if followerId != followeeId:
self.followees[followerId].add(followeeId)
# time complexity:
# space complexity:
def unfollow(self, followerId: int, followeeId: int) -> None:
# Note:
#
self.followees[followerId].discard(followeeId)
# time complexity:
# space complexity:
def getNewsFeed(self, userId: int) -> list[int]:
# Note:
#
# heap stores (-timestamp, tweetId, userId, index_in_user_tweets)
heap = []
# The list of users to fetch tweets from: self + followees
users = self.followees[userId] | {userId}
# Init pointers for each user tweet list (start from newest)
for u in users:
if self.tweets[u]:
# Index of last tweet in deque (newest)
idx = len(self.tweets[u]) - 1
timestamp, tweetId = self.tweets[u][idx]
heapq.heappush(heap, (-timestamp, tweetId, u, idx))
result = []
while heap and len(result) < self.FEED_SIZE:
neg_time, tweetId, u, idx = heapq.heappop(heap)
result.append(tweetId)
# Move pointer to next newest tweet if available
if idx > 0:
idx -= 1
timestamp, tweetId = self.tweets[u][idx]
heapq.heappush(heap, (-timestamp, tweetId, u, idx))
return result
# overall: time complexity
# overall: space complexity
Solution 2: Store Tweets as Linked Lists + Merge K Sorted Lists - Heap/Heap
class TweetNode:
# IRL Use Case:
# 'Celebrity' users with many tweets; avoids indexing overhead when fetching top 10.
# Note:
# Tweets: Linked list node for tweets, storing tweetId, timestamp, and next pointer
# News Feed: MaxHeap merges the heads of k linked lists, picking the newest tweet each time to produce top 10.
# Pros: Handles users with very large tweet histories efficiently.
# Cons: Pointer overhead; more complex than simple list implementation.
def __init__(self, tweetId, time):
self.tweetId = tweetId
self.time = time
self.next = None
class Twitter:
def __init__(self):
# Note:
# Stores tweets as linked lists.
# News feed built by max-heap merging list heads.
# Pros/Cons as above.
# Map userId to head of linked list of tweets
self.tweets = {}
# Map userId to set of followed userIds
self.followees = defaultdict(set)
self.time = 0
self.FEED_SIZE = 10
def postTweet(self, userId: int, tweetId: int) -> None:
# Note:
#
self.time += 1
node = TweetNode(tweetId, self.time)
node.next = self.tweets.get(userId, None)
self.tweets[userId] = node
def follow(self, followerId: int, followeeId: int) -> None:
# Note:
#
if followerId != followeeId:
self.followees[followerId].add(followeeId)
def unfollow(self, followerId: int, followeeId: int) -> None:
# Note:
#
self.followees[followerId].discard(followeeId)
def getNewsFeed(self, userId: int) -> list[int]:
# Note:
#
# Users to consider: self + followees
users = self.followees[userId] | {userId}
# Build a max heap of (-time, TweetNode) for heads of all tweet lists
heap = []
for u in users:
if self.tweets.get(u):
heapq.heappush(heap, (-self.tweets[u].time, self.tweets[u]))
result = []
while heap and len(result) < self.FEED_SIZE:
neg_time, node = heapq.heappop(heap)
result.append(node.tweetId)
if node.next:
heapq.heappush(heap, (-node.next.time, node.next))
return result
# overall: time complexity
# overall: space complexity
Solution 3: Store Tweets as Arrays/Deques + Dynamic Heap Merge - Heap/Heap
class Twitter:
def __init__(self):
# IRL Use Case:
# Most practical for IRL apps where MOST users have few tweets and follow a manageable number of accounts.
# Slightly more efficient than Solution 1 due to index-based access for news feed construction.
# Note:
# Tweets: Stores tweets as deques per user, uses indices to access latest tweets.
# News Feed: Max-heap merges the most recent tweets from each user for news feed.
# Pros: Simple, practical, low overhead, works well with typical usage.
# Cons: Slightly more complex than naive list append-only approach; still has heap overhead.
# Map userId to list of (timestamp, tweetId)
self.tweets = defaultdict(deque)
# Map userId to set of followed userIds
self.followees = defaultdict(set)
# Global timestamp for ordering
self.time = 0
self.FEED_SIZE = 10
def postTweet(self, userId: int, tweetId: int) -> None:
# Increment timestamp
self.time += 1
self.tweets[userId].append((self.time, tweetId))
# Optional: limit stored tweets to last 100 per user
if len(self.tweets[userId]) > 100:
self.tweets[userId].popleft()
def follow(self, followerId: int, followeeId: int) -> None:
if followerId != followeeId:
self.followees[followerId].add(followeeId)
def unfollow(self, followerId: int, followeeId: int) -> None:
self.followees[followerId].discard(followeeId)
def getNewsFeed(self, userId: int) -> list[int]:
# Users to pull tweets from: self + followees
users = self.followees[userId] | {userId}
heap = []
# Initialize heap with most recent tweet of each user
for u in users:
if self.tweets[u]:
idx = len(self.tweets[u]) - 1
ts, tid = self.tweets[u][idx]
heapq.heappush(heap, (-ts, tid, u, idx))
result = []
while heap and len(result) < self.FEED_SIZE:
neg_ts, tid, u, idx = heapq.heappop(heap)
result.append(tid)
if idx > 0:
idx -= 1
ts, tid = self.tweets[u][idx]
heapq.heappush(heap, (-ts, tid, u, idx))
return result
295. Find Median from Data Stream ::1:: - Hard
Topics: Two Pointers, Design, Sorting, Heap (Priority Queue), Data Stream
Intro
The median is the middle value in an ordered integer list. If the size of the list is even, there is no middle value, and the median is the mean of the two middle values. For example, for arr = [2,3,4], the median is 3. For example, for arr = [2,3], the median is (2 + 3) / 2 = 2.5. Implement the MedianFinder class: MedianFinder() initializes the MedianFinder object. void addNum(int num) adds the integer num from the data stream to the data structure. double findMedian() returns the median of all elements so far. Answers within 10-5 of the actual answer will be accepted.
Example Input | Output |
---|---|
["MedianFinder", "addNum", "addNum", "findMedian", "addNum", "findMedian"] [[], [1], [2], [], [3], []] | [null, null, null, 1.5, null, 2.0] |
Constraints:
-105 ≤ num ≤ 105
There will be at least one element in the data structure before calling findMedian.
At most 5 * 104 calls will be made to addNum and findMedian
Abstraction
Find median value from a stream of data.
Space & Time Complexity
Solution | Time Complexity | Space Complexity | Time Remark | Space Remark |
---|---|---|---|---|
Bug | Error |
---|---|
Brute Force:
Aspect | Time Complexity | Space Complexity | Time Remarks | Space Remarks |
---|---|---|---|---|
Find the Bug:
Solution 1: MaxHeap + MinHeap Median Finder - Heap/Heap
class MedianFinder:
# Note:
# Two
def __init__(self):
# MaxHeap stores smaller half,
# all numbers less than or equal to the median
self.maxHeap = []
# MinHeap stores larger half,
# all numbers greater than or equal to the median
self.minHeap = []
# time complexity:
# space complexity:
def addNum(self, num: int) -> None:
# Note:
# Two heaps track lower and upper half
#
# Add new number to maxHeap
heapq.heappush(self.maxHeap, -num)
# Ensure all elements in maxHeap greater than every element in minHeap
# move smallest largest from maxHeap to minHeap if out of order
if (self.minHeap and (-self.maxHeap[0] > self.minHeap[0])):
val = -heapq.heappop(self.maxHeap)
heapq.heappush(self.minHeap, val)
# Balance heaps:
# MaxHeap should have 1 more element than minHeap
# if minHeap is larger, move minHeap root to maxHeap
if len(self.maxHeap) < len(self.minHeap):
#
val = heapq.heappop(self.minHeap)
#
heapq.heappush(self.maxHeap, -val)
# if maxHeap is larger by more than 1, move maxHeap root to minHeap
elif len(self.maxHeap) > len(self.minHeap) + 1:
#
val = -heapq.heappop(self.maxHeap)
#
heapq.heappush(self.minHeap, val)
# time complexity:
# space complexity:
def findMedian(self) -> float:
# Note:
# Median retrieval logic:
# 1. if odd -> root of MaxHeap (largest in smaller half)
# 2. if even -> average of roots from both heaps
# odd
if len(self.maxHeap) > len(self.minHeap):
return -self.maxHeap[0]
# even
average = (-self.maxHeap[0] + self.minHeap[0]) / 2
return average
Solution 2: Balanced BST - Heap/Heap
class MedianFinder:
def __init__(self):
# Note:
# balanced binary tree
self.sl = SortedList()
# time complexity:
# space complexity:
def addNum(self, num: int) -> None:
# insert in sorted order
self.sl.add(num)
# time complexity:
# space complexity:
def findMedian(self) -> float:
# len
n = len(self.sl)
# odd
if n % 2 == 1:
return self.sl[n // 2]
# even
average = (self.sl[n // 2 - 1] + self.sl[n // 2]) / 2
return average
# overall: time complexity
# overall: space complexity